Docs Menu
Docs Home
/
Spark Connector
/ /

Batch Read Configuration Options

On this page

  • Overview
  • Partitioner Configurations
  • Specifying Properties in connection.uri

You can configure the following properties when reading data from MongoDB in batch mode.

Note

If you use SparkConf to set the connector's read configurations, prefix spark.mongodb.read. to each property.

Property name
Description
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: Throws an exception when parsing a document that doesn't match the schema.

  • ReadConfig.ParseMode.PERMISSIVE: Sets fields to null when data types don't match the schema. To store each invalid document as an extended JSON string, combine this value with the columnNameOfCorruptRecord option.

  • ReadConfig.ParseMode.DROPMALFORMED: Ignores any document that doesn't match the schema.


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
partitioner
The partitioner full class name.
You can specify a custom implementation that must implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface.
See the Partitioner Configuration section for more information about partitioners.

Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
partitioner.options.
Partitioner configuration prefix.
See the Partitioner Configuration section for more information about partitioners.
sampleSize
The number of documents to sample from the collection when inferring
the schema.

Default: 1000
sql.inferSchema.mapTypes.enabled
Whether to enable Map types when inferring the schema.
When enabled, large compatible struct types are inferred to a MapType instead.

Default: true
sql.inferSchema.mapTypes.minimum.key.size
Minimum size of a StructType before inferring as a MapType.

Default: 250
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

IMPORTANT: Custom aggregation pipelines must be compatible with the partitioner strategy. For example, aggregation stages such as $group do not work with any partitioner that creates more than one partition.

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

Partitioners change the read behavior of batch reads that use the Spark Connector. By dividing the data into partitions, you can run transformations in parallel.

This section contains configuration information for the following partitioner:

Note

Batch Reads Only

Because the data-stream-processing engine produces a single data stream, partitioners do not affect streaming reads.

SamplePartitioner is the default partitioner configuration. This configuration lets you specify a partition field, partition size, and number of samples per partition.

To use this configuration, set the partitioner configuration option to com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner.

Property name
Description
partitioner.options.partition.field

The field to use for partitioning, which must be a unique field.

Default: _id

partitioner.options.partition.size

The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents.

Default: 64

partitioner.options.samples.per.partition

The number of samples to take per partition. The total number of samples taken is:

samples per partition * ( count / number of documents per partition)

Default: 10

Example

For a collection with 640 documents with an average document size of 0.5 MB, the default SamplePartitioner configuration creates 5 partitions with 128 documents per partition.

The Spark Connector samples 50 documents (the default 10 per intended partition) and defines 5 partitions by selecting partition field ranges from the sampled documents.

The ShardedPartitioner configuration automatically partitions the data based on your shard configuration.

To use this configuration, set the partitioner configuration option to com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner.

Important

ShardedPartitioner Restrictions

  1. In MongoDB Server v6.0 and later, the sharding operation creates one large initial chunk to cover all shard key values, making the sharded partitioner inefficient. We do not recommend using the sharded partitioner when connected to MongoDB v6.0 and later.

  2. The sharded partitioner is not compatible with hashed shard keys.

The PaginateBySizePartitioner configuration paginates the data by using the average document size to split the collection into average-sized chunks.

To use this configuration, set the partitioner configuration option to com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner.

Property name
Description
partitioner.options.partition.field

The field to use for partitioning, which must be a unique field.

Default: _id

partitioner.options.partition.size

The size (in MB) for each partition. Smaller partition sizes

create more partitions containing fewer documents.

Default: 64

The PaginateIntoPartitionsPartitioner configuration paginates the data by dividing the count of documents in the collection by the maximum number of allowable partitions.

To use this configuration, set the partitioner configuration option to com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner.

Property name
Description
partitioner.options.partition.field

The field to use for partitioning, which must be a unique field.

Default: _id

partitioner.options.max.number.of.partitions

The number of partitions to create.

Default: 64

The SinglePartitionPartitioner configuration creates a single partition.

To use this configuration, set the partitioner configuration option to com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner.

The AutoBucketPartitioner configuration is similar to the SamplePartitioner configuration, but uses the $bucketAuto aggregation stage to paginate the data. By using this configuration, you can partition the data across single or multiple fields, including nested fields.

To use this configuration, set the partitioner configuration option to com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner.

Property name
Description
partitioner.options.partition.fieldList

The list of fields to use for partitioning. The value can be either a single field name or a list of comma-separated fields.

Default: _id

partitioner.options.partition.chunkSize

The average size (MB) for each partition. Smaller partition sizes create more partitions containing fewer documents. Because this configuration uses the average document size to determine the number of documents per partition, partitions might not be the same size.

Default: 64

partitioner.options.partition.samplesPerPartition

The number of samples to take per partition.

Default: 100

partitioner.options.partition.partitionKeyProjectionField

The field name to use for a projected field that contains all the fields used to partition the collection. We recommend changing the value of this property only if each document already contains the __idx field.

Default: __idx

If you use SparkConf to specify any of the previous settings, you can either include them in the connection.uri setting or list them individually.

The following code example shows how to specify the database, collection, and read preference as part of the connection.uri setting:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

To keep the connection.uri shorter and make the settings easier to read, you can specify them individually instead:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

Important

If you specify a setting in both the connection.uri and on its own line, the connection.uri setting takes precedence. For example, in the following configuration, the connection database is foobar, because it's the value in the connection.uri setting:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

Back

Read