Introducing MongoDB Connector for Apache Kafka version 1.9
Today, MongoDB released version 1.9 of the MongoDB Connector for Apache Kafka! This article highlights the key features of this new release!
Pre/Post document states
In MongoDB 6.0, Change Streams added the ability to retrieve the before and after state of an entire document. To enable this functionality on the collection you can set it as a parameter in the createCollection command such as:
db.createCollection(
 "temperatureSensor",
 { changeStreamPreAndPostImages: { enabled: true } }
)

Alternatively, for existing collections, use colMod as shown below:
db.runCommand( {
 collMod: <collection>,
 changeStreamPreAndPostImages: { enabled: <boolean> }
} )

Once the collection is configured for pre and post images, you can set the change.stream.full.document.before.change
source connector parameter to include this extra information in the change event.
For example, consider this source definition:
{
 "name": "mongo-simple-source",
 "config": {
 "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
 "connection.uri": "<< MONGODB CONNECTION STRING >>",
 "database": "test",
 "collection": "temperatureSensor",
 "change.stream.full.document.before.change":"whenavailable"
 }
}

When the following document is inserted:
db.temperatureSensor.insertOne({'sensor_id':1,'value':100})

Then an update is applied:
db.temperatureSensor.updateOne({'sensor_id':1},{ $set: { 'value':105}})

You can see the change stream event written to Kafka topic is as follows:
{
 "_id": {
 "_data": "82636D39C8000000012B022C0100296E5A100444B0F5E386F04767814F28CB4AAE7FEE46645F69640064636D399B732DBB998FA8D67E0004"
 },
 "operationType": "update",
 "clusterTime": {
 "$timestamp": {
 "t": 1668102600,
 "i": 1
 }
 },
 "wallTime": {
 "$date": 1668102600716
 },
 "ns": {
 "db": "test",
 "coll": "temperatureSensor"
 },
 "documentKey": {
 "_id": {
 "$oid": "636d399b732dbb998fa8d67e"
 }
 },
 "updateDescription": {
 "updatedFields": {
 "value": 105
 },
 "removedFields": [],
 "truncatedArrays": []
 },
 "fullDocumentBeforeChange": {
 "_id": {
 "$oid": "636d399b732dbb998fa8d67e"
 },
 "sensor_id": 1,
 "value": 100
 }
}

Note the fullDocumentBeforeChange key includes the original document before the update occurred.
Starting the connector at a specific time
Prior to version 1.9, when the connector starts as a source, it will open a MongoDB change stream and any new data will get processed by the source connector. To copy all the existing data in the collection first before you begin processing the new data, you specify the “copy.existing” property. One frequent user request is to start the connector based upon a specific timestamp versus when the connector starts. In 1.9 a new parameter called startup.mode was added to specify when to start writing data.
startup.mode=latest (default)
“Latest” is the default behavior and starts processing the data when the connector starts. It ignores any existing data when the connector starts.
startup.mode=timestamp
“timestamp” allows you to start processing at a specific point in time as defined by additional startup.mode.timestamp.* properties. For example, to start the connector from 7AM on November 21, 2022, you set the value as follows:
startup.mode.timestamp.start.at.operation.time=’2022-11-21T07:00:00Z’
Supported values are an ISO-8601 format string date as shown above or as a BSON extended string format.
startup.mode=copy.existing
Same behavior as the existing as the configuration option, “copy.existing=true”. Note that “copy.existing” as a separate parameter is now deprecated. If you defined any granular copy.existing parameters such as copy.existing.pipeline, just prepend them with “startup.mode.copy.existing.” property name.
Reporting MongoDB errors to the DLQ
Kafka supports writing errors to a dead letter queue. In version 1.5 of the connector, you could write all exceptions to the DLQ through the mongo.error.tolerance=’all’
. One thing to note was that these errors were Kafka generated errors versus errors that occurred within MongoDB. Thus, if the sink connector failed to write to MongoDB due to a duplicate _id error, for example, this error wouldn’t be written to the DLQ. In 1.9, errors generated within MongoDB will be reported to the DLQ.
Behavior change on inferring schema
Prior to version 1.9 of the connector, if you are inferring schema and insert a MongoDB document that contains arrays with different value data types, the connector is naive and would simply set the type for the whole array to be a string. For example, consider a document that resembles:
{
 "myfoo": [
 {
 "key1": 1
 },
 {
 "key1": 1,
 "key2": "dogs"
 }
 ]
 }

If we set output.schema.infer.value. to true on a source connector, the message in the Kafka Topic will resemble the following:
…
"fullDocument": {
…
 "myfoo": [
 "{\"key1\": 1}",
 "{\"key1\": 1, \"key2\": \"dogs\"}"
 ]
 },
…

Notice the array items contain different values. In this example, key1 is a subdocument with a single value the number 1, the next item in the “myfoo” array is a subdocument with the same “key1” field and value of an integer, 1, and another field, “key 2” that has a string as a value. When this scenario occurs the connector will wrap the entire array as a string. This behavior can also apply when using different keys that contain different data type values.
In version 1.9, the connector when presented with this configuration will not wrap the arrays, rather it will create the appropriate schemas for the variable arrays with different data type values.
The same document when run in 1.9 will resemble:
"fullDocument": {
 …
 "myfoo": [
 {
 "key1": 1,
 },
 {
 "key1": 1,
 "key2": "DOGS"
 }
 ]
 },

Note that this behavior is a breaking change and that inferring schemas when using arrays can cause performance degradation for very large arrays using different data type values.
Download the latest version of the MongoDB Connector for Apache Kafka from Confluent Hub!
To learn more about the connector read the MongoDB Online Documentation.
Questions? Ask on the MongoDB Developer Community Connectors and Integrations forum!