Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Learn why MongoDB was selected as a leader in the 2024 Gartner® Magic Quadrant™
MongoDB Developer
Connectors
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Connectorschevron-right

Tuning the MongoDB Connector for Apache Kafka

Robert Walters, Diego Rodriguez10 min read • Published Mar 01, 2022 • Updated Sep 17, 2024
KafkaConnectors
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
MongoDB Connector for Apache Kafka (MongoDB Connector) is an open-source Java application that works with Apache Kafka Connect enabling seamless data integration of MongoDB with the Apache Kafka ecosystem. When working with the MongoDB Connector, the default values cover a great variety of scenarios, but there are some scenarios that require more fine-grained tuning. In this article, we will walk through important configuration properties that affect the MongoDB Kafka Source and Sink Connectors performance, and share general recommendations.

Tuning the source connector

Let’s first take a look at the connector when it is configured to read data from MongoDB and write it into a Kafka topic. When you configure the connector this way, it is known as a “source connector.”
When the connector is configured as a source, a change stream is opened within the MongoDB cluster based upon any configuration you specified, such as pipeline. These change stream events get read into the connector and then written out to the Kafka topic, and they resemble the following:
1{
2 _id : { <BSON Object> },
3 "operationType" : "<operation>",
4 "fullDocument" : { <document> },
5 "ns" : {
6 "db" : "<database>",
7 "coll" : "<collection>"
8 },
9 "to" : {
10 "db" : "<database>",
11 "coll" : "<collection>"
12 },
13 "documentKey" : { "_id" : <value> },
14 "updateDescription" : {
15 "updatedFields" : { <document> },
16 "removedFields" : [ "<field>", ... ],
17 "truncatedArrays" : [
18 { "field" : <field>, "newSize" : <integer> },
19 ...
20 ]
21 },
22 "clusterTime" : <Timestamp>,
23 "txnNumber" : <NumberLong>,
24 "lsid" : {
25 "id" : <UUID>,
26 "uid" : <BinData>
27 }
28}
The connector configuration properties help define what data is written out to Kafka. For example, consider the scenario where we insert into MongoDB the following:
1Use Stocks
2db.StockData.insertOne({'symbol':'MDB','price':441.67,'tx_time':Date.now()})
When publish.full.document.only is set to false (the default setting), the connector writes the entire event as shown below:
1{"_id":
2{"_data": "826205217F000000022B022C0100296E5A1004AA1707081AA1414BB9F647FD49855EE846645F696400646205217FC26C3DE022E9488E0004"},
3"operationType": "insert",
4"clusterTime":
5 {"$timestamp":
6 {"t": 1644503423, "i": 2}},
7"fullDocument":
8 {"_id":
9 {"$oid": "6205217fc26c3de022e9488e"},
10 "symbol": "MDB",
11 "price": 441.67,
12 "tx_time": 1.644503423267E12},
13 "ns":
14 {"db": "Stocks", "coll": "StockData"},
15 "documentKey":
16 {"_id": {"$oid": "6205217fc26c3de022e9488e"}}}}
17}
When publish.full.document.only is set to true and we issue a similar statement, it looks like the following:
1use Stocks
2db.StockData.insertOne({'symbol':'TSLA','price':920.00,'tx_time':Date.now()})
We can see that the data written to the Kafka topic is just the changed document itself, which in this example, is an inserted document.
1{"_id": {"$oid": "620524b89d2c7fb2a606aa16"}, "symbol": "TSLA",
2 "price": 920,
3 "tx_time": 1.644504248732E12}"}

Resume tokens

Another import concept to understand with source connectors is resume tokens. Resume tokens make it possible for the connector to fail, get restarted, and resume where it left off reading the MongoDB change stream. Resume tokens by default are stored in a Kafka topic defined by the offset.storage.topic parameter (configurable at the Kafka Connect Worker level for distributed environments) or in the file system in a file defined by the offset.storage.file.filename parameter (configurable at the Kafka Connect Worker level for standalone environments). In the event that the connector has been offline and the underlying MongoDB oplog has rolled over, you may get an error when the connector restarts. Read the Invalid Resume Token section of the online documentation to learn more about this condition.

Configuration properties

The full set of properties for the Kafka Source Connector can be found in the documentation. The properties that should be considered with respect to performance tuning are as follows:
  • batch.size: the cursor batch size that defines how many change stream documents are retrieved on each getMore operation. Defaults to 1,000.
  • poll.await.time.ms: The maximum amount of time in milliseconds that the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 5,000.
  • poll.max.batch.size: maximum number of source records to send to Kafka at once. This setting can be used to limit the amount of data buffered internally in the Connector. Defaults to 1,000.
  • pipeline: an array of aggregation pipeline stages to run in your change stream. Defaults to an empty pipeline that provides no filtering.
  • copy.existing.max.threads: the number of threads to use when performing the data copy. Defaults to the number of processors.
  • copy.existing.queue.size: the max size of the queue to use when copying data. This is buffered internally by the Connector. Defaults to 16,000.

Recommendations

The following are some general recommendations and considerations when configuring the source connector:

Scaling the source

One of the most common questions is how to scale the source connector. For scenarios where you have a large amount of data to be copied via copy.existing, keep in mind that using the source connector this way may not be the best way to move this large amount of data. Consider the process for copy.existing:
  • Store the latest change stream resume token.
  • Spin up a thread (up to copy.existing.max.threads) for each namespace that is being copied.
  • When all threads finish, the resume tokens are read, written, and caught up to current time.
While technically, the data will eventually be copied, this process is relatively slow. And if your data size is large and your incoming data is faster than the copy process, the connector may never get into a state where new data changes are handled by the connector.
For high throughput datasets trying to be copied with copy.existing, a typical situation is overwriting the resume token stored in (1) due to high write activity. This breaks the copy.existing functionality, and it will need to be restarted, on top of dealing with the messages that were already processed to the Kafka topic. When this happens, the alternatives are:
  • Increase the oplog size to make sure the copy.existing phase can finish.
  • Throttle write activity in the source cluster until the copy.existing phase finishes.
Another option for handling high throughput of change data is to configure multiple source connectors. Each source connector should use a pipeline and capture changes from a subset of the total data. Keep in mind that each time you create a source connector pointed to the same MongoDB cluster, it creates a separate change stream. Each change stream requires resources from the MongoDB cluster, and continually adding them will decrease server performance. That said, this degradation may not become noticeable until the amount of connectors reaches the 100+ range, so breaking your collections into five to 10 connector pipelines is the best way to increase source performance. In addition, using several different source connectors on the same namespace changes the total ordering of the data on the sink versus the original order of data in the source cluster.

Tune the change stream pipeline

When building your Kafka Source Connector configuration, ensure you appropriately tune the “pipeline” so that only wanted events are flowing from MongoDB to Kafka Connect, which helps reduce network traffic and processing times. For a detailed pipeline example, check out the Customize a Pipeline to Filter Change Events section of the online documentation.

Adjust to the source cluster throughput

Your Kafka Source Connector can be watching a set of collections with a low volume of events, or the opposite, a set of collections with a very high volume of events.
In addition, you may want to tune your Kafka Source Connector to react faster to changes, reduce round trips to MongoDB or Kafka, and similar changes.
With this in mind, consider adjusting the following properties for the Kafka Source Connector:
  • Adjust the value of batch.size:
    • Higher values mean longer processing times on the source cluster but fewer round trips to it. It can also increase the chances of finding relevant change events when the volume of events being watched is small.
    • Lower values mean shorter processing times on the source cluster but more round trips to it. It can reduce the chances of finding relevant change events when the volume of events being watched is small.
  • Adjust the value of poll.max.batch.size:
    • Higher values require more memory to buffer the source records with fewer round trips to Kafka. This comes at the expense of the memory requirements and increased latency from the moment a change takes place in MongoDB to the point the Kafka message associated with that change reaches the destination topic.
    • Lower values require less memory to buffer the source records with more round trips to Kafka. It can also help reduce the latency from the moment a change takes place in MongoDB to the point the Kafka message associated with that change reaches the destination topic.
  • Adjust the value of poll.await.time.ms:
    • Higher values can allow source clusters with a low volume of events to have any information to be sent to Kafka at the expense of increased latency from the moment a change takes place in MongoDB to the point the Kafka message associated with that change reaches the destination topic.
    • Lower values reduce latency from the moment a change takes place in MongoDB to the point the Kafka message associated with that change reaches the destination topic. But for source clusters with a low volume of events, it can prevent them from having any information to be sent to Kafka.
This information is an overview of what to expect when changing these values, but keep in mind that they are deeply interconnected, with the volume of change events on the source cluster having an important impact too:
  1. The Kafka Source Connector issues getMore commands to the source cluster using batch.size.
  2. The Kafka Source Connector receives the results from step 1 and waits until either poll.max.batch.size or poll.await.time.ms is reached. While this doesn’t happen, the Kafka Source Connector keeps “feeding” itself with more getMore results.
  3. When either poll.max.batch.size or poll.await.time.ms is reached, the source records are sent to Kafka.

“Copy existing” feature

When running with the copy.existing property set to true, consider these additional properties:
  • copy.existing.queue.size: the amount of records the Kafka Source Connector buffers internally. This queue and its size include all the namespaces to be copied by the “Copy Existing” feature. If this queue is full, the Kafka Source Connector blocks until space becomes available.
  • copy.existing.max.threads: the amount of concurrent threads used for copying the different namespaces. There is a one namespace to one thread mapping, so it is common to increase this up to the maximum number of namespaces being copied. If the number exceeds the number of cores available in the system, then the performance gains can be reduced.
  • copy.existing.allow.disk.use: allows the copy existing aggregation to use temporary disk storage if required. The default is set to true but should be set to false if the user doesn't have the permissions for disk access.

Memory implications

If you experience JVM “out of memory” issues on the Kafka Connect Worker process, you can try reducing the following two properties that control the amount of data buffered internally:
  • poll.max.batch.size
  • copy.existing.queue.size: applicable if the “copy.existing” property is set to true.
It is important to note that lowering these values can result in unwanted impact. Adjusting the JVM Heap Size to your environment needs is recommended as long as you have available resources and the memory needs are not the result of memory leaks.

Tuning the sink connector

When the MongoDB Connector is configured as a sink, it reads from a Kafka topic and writes to a MongoDB collection.
As with the source, there exists a mechanism to ensure offsets are stored in the event of a sink failure. Kafka connect manages this, and the information is stored in the __consumer_offsets topic. The MongoDB Connector has configuration properties that affect performance. They are as follows:
  • max.batch.size: the maximum number of sink records to batch together for processing. A higher number will result in more documents being sent as part of a single bulk command. Default value is 0.
  • rate.limiting.every.n: number of processed batches that trigger the rate limit. A value of 0 means no rate limiting. Default value is 0. In practice, this setting is rarely used.
  • rate.limiting.timeout: how long (in milliseconds) to wait before continuing to process data once the rate limit is reached. Default value is 0. This setting is rarely used.
  • tasks.max: the maximum number of tasks. Default value is 1.

Recommendations

Add indexes to your collections for consistent performance

Writes performed by the sink connector take additional time to complete as the size of the underlying MongoDB collection grows. To prevent performance deterioration, use an index to support these write queries.

Achieve as much parallelism as possible

The Kafka Sink Connector (KSC) can take advantage of parallel execution thanks to the tasks.max property. The specified number of tasks will only be created if the source topic has the same number of partitions. Note: A partition should be considered as a logic group of ordered records, and the producer of the data determines what each partition contains. Here is the breakdown of the different combinations of number of partitions in the source topic and tasks.max values:
If working with more than one partition but one task:
  • The task processes partitions one by one: Once a batch from a partition is processed, it moves on to another one so the order within each partition is still guaranteed.
  • Order among all the partitions is not guaranteed.
If working with more than one partition and an equal number of tasks:
  • Each task is assigned one partition and the order is guaranteed within each partition.
  • Order among all the partitions is not guaranteed.
If working with more than one partition and a smaller number of tasks:
  • The tasks that are assigned more than one partition process partitions one by one: Once a batch from a partition is processed, it moves on to another one so the order within each partition is still guaranteed.
  • Order among all the partitions is not guaranteed.
If working with more than one partition and a higher number of tasks:
  • Each task is assigned one partition and the order is guaranteed within each partition.
  • KSC will not generate an excess number of tasks.
  • Order among all the partitions is not guaranteed.
Processing of partitions may not be in order, meaning that Partition B may be processed before Partition A. All messages within the partition conserve strict order.
Note: When using MongoDB to write CDC data, the order of data is important since, for example, you do not want to process a delete before an update on the same data. If you specify more than one partition for CDC data, you run the risk of data being out of order on the sink collection.

Tune the bulk operations

The Kafka Sink Connector (KSC) works by issuing bulk write operations. All the bulk operations that the KSC executes are, by default, ordered and as such, the order of the messages is guaranteed within a partition. See Ordered vs Unordered Operations for more information. Note: As of 1.7, bulk.write.ordered, if set to false, will process the bulk out of order, enabling more documents within the batch to be written in the case of a failure of a portion of the batch.
The amount of operations that are sent in a single bulk command can have a direct impact on performance. You can modify this by adjusting max.batch.size:
  • A higher number will result in more operations being sent as part of a single bulk command. This helps improve throughput at the expense of some added latency. However, a very big number might result in cache pressure on the destination cluster.
  • A small number will ease the potential cache pressure issues which might be useful for destination clusters with fewer resources. However, throughput decreases, and you might experience consumer lag on the source topics as the producer might publish messages in the topic faster than the KSC processes them.
  • This value affects processing within each of the tasks of the KSC.

Throttle the Kafka sink connector

In the event that the destination MongoDB cluster is not able to handle consistent throughput, you can configure a throttling mechanism. You can do this with two properties:
  • rate.limiting.every.n: number of processed batches that should trigger the rate limit. A value of 0 means no rate limiting.
  • rate.limiting.timeout: how long (in milliseconds) to wait before continuing to process data once the rate limit is reached.
The end result is that whenever the KSC writes rate.limiting.every.n number of batches, it waits rate.limiting.timeout milliseconds before writing the next batch. This allows a destination MongoDB cluster that cannot handle consistent throughput to recover before receiving new load from the KSC.

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Article

Streaming Data With Apache Spark and MongoDB


Aug 28, 2024 | 7 min read
Podcast

MongoDB Podcast Interview With Connectors and Translators Team


Sep 11, 2024 | 16 min
Tutorial

Deploying the MongoDB Enterprise Kubernetes Operator on Google Cloud


Jan 13, 2023 | 6 min read
Tutorial

How to Get Started with MongoDB Atlas Stream Processing and the HashiCorp Terraform MongoDB Atlas Provider


May 20, 2024 | 5 min read
Table of Contents
  • Tuning the source connector