Docs Menu
Docs Home
/
Spark Connector
/ /

Streaming Read Configuration Options

On this page

  • Overview
  • Change Stream Configuration
  • Specifying Properties in connection.uri
  • Specifying Multiple Collections in the collection Property

You can configure the following properties when reading data from MongoDB in streaming mode.

Note

If you use SparkConf to set the connector's read configurations, prefix spark.mongodb.read. to each property.

Property name
Description
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
You can specify multiple collections by separating the collection names with a comma.

To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: Throws an exception when parsing a document that doesn't match the schema.

  • ReadConfig.ParseMode.PERMISSIVE: Sets fields to null when data types don't match the schema. To store each invalid document as an extended JSON string, combine this value with the columnNameOfCorruptRecord option.

  • ReadConfig.ParseMode.DROPMALFORMED: Ignores any document that doesn't match the schema.


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

Custom aggregation pipelines must be compatible with the partitioner strategy. For example, aggregation stages such as $group do not work with any partitioner that creates more than one partition.

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
change.stream.
Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

You can configure the following properties when reading a change stream from MongoDB:

Property name
Description
change.stream.lookup.full.document

Determines what values your change stream returns on update operations.

The default setting returns the differences between the original document and the updated document.

The updateLookup setting also returns the differences between the original document and updated document, but it also includes a copy of the entire updated document.

For more information on how this change stream option works, see the MongoDB server manual guide Lookup Full Document for Update Operation.

Default: "default"

change.stream.micro.batch.max.partition.count
The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

WARNING: Specifying a value larger than 1 can alter the order in which the Spark Connector processes change events. Avoid this setting if out-of-order processing could create data inconsistencies downstream.

change.stream.publish.full.document.only
Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • The connector filters out messages that omit the fullDocument field and publishes only the value of the field.

  • If you don't specify a schema, the connector infers the schema from the change stream document.

This setting overrides the change.stream.lookup.full.document setting.

Default: false

change.stream.startup.mode
Specifies how the connector starts up when no offset is available.
This setting accepts the following values:

If you use SparkConf to specify any of the previous settings, you can either include them in the connection.uri setting or list them individually.

The following code example shows how to specify the database, collection, and read preference as part of the connection.uri setting:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

To keep the connection.uri shorter and make the settings easier to read, you can specify them individually instead:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

Important

If you specify a setting in both the connection.uri and on its own line, the connection.uri setting takes precedence. For example, in the following configuration, the connection database is foobar, because it's the value in the connection.uri setting:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

You can specify multiple collections in the collection change stream configuration property by separating the collection names with a comma. Do not add a space between the collections unless the space is a part of the collection name.

Specify multiple collections as shown in the following example:

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

If a collection name is "*", or if the name includes a comma or a backslash (\), you must escape the character as follows:

  • If the name of a collection used in your collection configuration option contains a comma, the Spark Connector treats it as two different collections. To avoid this, you must escape the comma by preceding it with a backslash (\). Escape a collection named "my,collection" as follows:

    "my\,collection"
  • If the name of a collection used in your collection configuration option is "*", the Spark Connector interprets it as a specification to scan all collections. To avoid this, you must escape the asterisk by preceding it with a backslash (\). Escape a collection named "*" as follows:

    "\*"
  • If the name of a collection used in your collection configuration option contains a backslash (\), the Spark Connector treats the backslash as an escape character, which might change how it interprets the value. To avoid this, you must escape the backslash by preceding it with another backslash. Escape a collection named "\collection" as follows:

    "\\collection"

    Note

    When specifying the collection name as a string literal in Java, you must further escape each backslash with another one. For example, escape a collection named "\collection" as follows:

    "\\\\collection"

You can stream from all collections in the database by passing an asterisk (*) as a string for the collection name.

Specify all collections as shown in the following example:

...
.option("spark.mongodb.collection", "*")

If you create a collection while streaming from all collections, the new collection is automatically included in the stream.

You can drop collections at any time while streaming from multiple collections.

Important

Inferring the Schema with Multiple Collections

If you set the change.stream.publish.full.document.only option to true, the Spark Connector infers the schema of a DataFrame by using the schema of the scanned documents.

Schema inference happens at the beginning of streaming, and does not take into account collections that are created during streaming.

When streaming from multiple collections and inferring the schema, the connector samples each collection sequentially. Streaming from a large number of collections can cause the schema inference to have noticeably slower performance. This performance impact occurs only while inferring the schema.

Back

Read