Measuring MongoDB Kafka Connector Performance
Rate this article
With today’s need of flexible event-driven architectures, companies across the globe choose best of breed technologies like MongoDB and Apache Kafka to help solve these challenges. While these two complementary technologies provide the power and flexibility to solve these large scale challenges, performance has always been at the forefront of concerns. In this blog, we will cover how to measure performance of the MongoDB Connector for Apache Kafka in both a source and sink configuration.
Recall that the MongoDB sink connector writes data from a Kafka topic into MongoDB. Writes by default use the ReplaceOneModel where the data is either updated if it's present on the destination cluster or created as a new document if it is not present. You are not limited to this upsert behavior. In fact, you can change the sink to perform deletes or inserts only. These write behaviors are defined by the Write Model Strategy setting in the sink configuration.
To determine the performance of the sink connector, we need a timestamp of when the document was written to MongoDB. Currently, the only write model strategy that writes a timestamp field on behalf of the user is UpdateOneTimestampsStrategy and UpdateOneBusinessKeyTimestampStrategy. These two write models insert a new field named _insertedTS, which can be used to query the lag between Kafka and MongoDB.
In this example, we’ll use MongoDB Atlas. MongoDB Atlas is a public cloud MongoDB data platform providing out-of-the-box capabilities such as MongoDB Charts, a tool to create visual representations of your MongoDB data. If you wish to follow along, you can create a free forever tier.
1 curl -X POST -H "Content-Type: application/json" --data ' 2 {"name": "datagen-users", 3 "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", 4 "kafka.topic": "topic333", 5 "quickstart": "users", 6 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 7 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 8 "value.converter.schemas.enable": "false", 9 "max.interval": 50, 10 "iterations": 5000, 11 "tasks.max": "2" 12 }}' http://localhost:8083/connectors -w "\n"
Now that the data is generated and written to the Kafka topic, “topic333,” let’s create our MongoDB sink connector to write this topic data into MongoDB Atlas. As stated earlier, we will add a field _insertedTS for use in calculating the lag between the message timestamp and this value. To perform the insert, let’s use the UpdateOneTimestampsStrategy write mode strategy.
1 curl -X POST -H "Content-Type: application/json" --data ' 2 {"name": "kafkametadata3", 3 "config": { 4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", 5 "topics": "topic333", 6 "connection.uri": "MONGODB CONNECTION STRING GOES HERE", 7 "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy", 8 "database": "kafka", 9 "collection": "datagen", 10 "errors.log.include.messages": true, 11 "errors.deadletterqueue.context.headers.enable": true, 12 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 13 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 14 "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy", 15 "tasks.max": 2, 16 "value.converter.schemas.enable":false, 17 "transforms": "InsertField", 18 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", 19 "transforms.InsertField.offset.field": "offsetColumn", 20 "transforms": "InsertField", 21 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", 22 "transforms.InsertField.timestamp.field": "timestampColumn" 23 }}' http://localhost:8083/connectors -w "\n"
Note: The field _insertedTS is populated with the time value of the Kafka connect server.
Take a look at the MongoDB Atlas collection “datagen” and familiarize yourself with the added fields.
In this blog, we will use MongoDB Charts to display a performance graph. To make it easy to build the chart, we will create a view.
1 use kafka 2 db.createView("SinkView","datagen", 3 [ 4 { 5 "$sort" : { 6 "_insertedTS" : 1, 7 "timestampColumn" : 1 8 } 9 }, 10 { 11 "$project" : { 12 "_insertedTS" : 1, 13 "timestampColumn" : 1, 14 "_id" : 0 15 } 16 }, 17 { 18 "$addFields" : { 19 "diff" : { 20 "$subtract" : [ 21 "$_insertedTS", 22 { 23 "$convert" : { 24 "input" : "$timestampColumn", 25 "to" : "date" 26 } 27 } 28 ] 29 } 30 } 31 } 32 ])
To create a chart, click on the Charts tab in MongoDB Atlas:
Click on Datasources and “Add Data Source.” The dialog will show the view that was created.
Select the SinkView and click Finish.
1 curl https://gist.githubusercontent.com/RWaltersMA/555b5f17791ecb58e6e683c54bafd381/raw/748301bcb7ae725af4051d40b2e17a8882ef2631/sink-chart-performance.charts -o sink-performance.charts
Choose Import Dashbaord from the Add Dashboard dropdown and select the downloaded file.
Load the sink-perfromance.chart file.
Select the kafka.SinkView as the data source at the destination then click Save.
Now the KafkaPerformance chart is ready to view. When you click on the chart, you will see something like the following:
This chart shows statistics on the differences between the timestamp in the Kafka topic and Kafka connector. In the above example, the maximum time delta is approximately one second (997ms) from inserting 40,000 documents.
To measure the source, we will take a different approach using KSQL to create a stream of the clusterTime timestamp from the MongoDB change stream and the time the row was written in the Kafka topic. From here, we can push this data into a MongoDB sink and display the results in a MongoDB Chart.
The first step will be to create the MongoDB Source connector that will be used to push data onto the Kafka topic.
1 curl -X POST -H "Content-Type: application/json" --data ' 2 {"name": "mongo-source-perf", 3 "config": { 4 "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", 5 "errors.log.enable": "true", 6 "errors.log.include.messages": "true", 7 "connection.uri": "mongodb+srv://MONGODB CONNECTION STRING HERE", 8 "database": "kafka", 9 "collection": "source-perf-test", 10 "mongo.errors.log.enable": "true", 11 "topic.prefix":"mdb", 12 "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson", 13 "output.format.value":"schema", 14 "output.schema.infer.value":true, 15 "output.format.key":"json", 16 "publish.full.document.only": "false", 17 "change.stream.full.document": "updateLookup" 18 }}' http://localhost:8083/connectors -w "\n"
There are many ways to generate sample data on MongoDB. In this blog post, we will use the doc-gen tool (Github repo) to quickly create sample documents based upon the user’s schema, which is defined as follows:
1 { 2 "_id" : ObjectId("59b99db4cfa9a34dcd7885b6"), 3 "name" : "Ned Stark", 4 "email" : "sean_bean@gameofthron.es", 5 "password" : "$2b$12$UREFwsRUoyF0CRqGNK0LzO0HM/jLhgUCNNIJ9RJAqMUQ74crlJ1Vu" 6 }
To generate data in your MongoDB cluster, issue the following:
1 docker run robwma/doc-gen:1.0 python doc-gen.py -s '{"name":"string","email":"string","password":"string"}' -c "MONGODB CONNECTION STRING GOES HERE" -t 1000 -db "kafka" -col "source-perf-test"
Launch KSQL and create a stream of the clusterTime within the message.
Note: If you do not have KSQL, you can run it as part of the Confluent Platform all in Docker using the following instructions.
If using Control Center, click ksQLDB, click Editor, and then paste in the following KSQL:
1 CREATE STREAM stats ( 2 clusterTime BIGINT 3 ) WITH ( 4 KAFKA_TOPIC='kafka.source-perf-test', 5 VALUE_FORMAT='AVRO' 6 );
The only information that we need from the message is the clusterTime. This value is provided within the change stream event. For reference, this is a sample event from change streams.
1 { 2 _id: { <BSON Object> }, 3 "operationType": "<operation>", 4 "fullDocument": { <document> }, 5 "ns": { 6 "db": <database>, 7 "coll": <collection> 8 }, 9 "to": { 10 "db": <database>, 11 "coll": <collection> 12 }, 13 "documentKey": { 14 _id: <value> 15 }, 16 "updateDescription": { 17 "updatedFields": { <document> }, 18 "removedFields": [ <field>, ... ] 19 }, 20 "clusterTime": <Timestamp>, 21 "txnNumber": <NumberLong>, 22 "lsid": { 23 "id": <UUID>, 24 "uid": <BinData> 25 } 26 }
Step 3
Next, we will create a ksql stream that calculates the difference between the cluster time (time when it was created on MongoDB) and the time where it was inserted on the broker.
1 CREATE STREAM STATS2 AS 2 select ROWTIME - CLUSTERTIME as diff, 1 AS ROW from STATS EMIT CHANGES;
As stated previously, this diff value may not be completely accurate if the clocks on Kafka and MongoDB are different.
Step 4
To see how the values change over time, we can use a window function and write the results to a table which can then be written into MongoDB via a sink connector.
1 SET 'ksql.suppress.enabled' = 'true'; 2 3 CREATE TABLE STATSWINDOW2 AS 4 SELECT AVG( DIFF ) AS AVG, MAX(DIFF) AS MAX, count(*) AS COUNT, ROW FROM STATS2 5 WINDOW TUMBLING (SIZE 10 SECONDS) 6 GROUP BY ROW 7 EMIT FINAL;
Windowing lets you control how to group records that have the same key for stateful operations, such as aggregations or joins into so-called windows. There are three ways to define time windows in ksqlDB: hopping windows, tumbling windows, and session windows. In this example, we will use tumbling as it is a fixed-duration, non-overlapping, and gap-less window.
The final step is to create a sink connector to insert all this aggregate data on MongoDB.
1 curl -X POST -H "Content-Type: application/json" --data ' 2 { 3 "name": "MongoSource-SinkPerf", 4 "config": { 5 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", 6 "tasks.max": "1", 7 "errors.log.enable": true, 8 "errors.log.include.messages": true, 9 "topics": "STATSWINDOW2", 10 "errors.deadletterqueue.context.headers.enable": true, 11 "connection.uri": "MONGODB CONNECTION STRING GOES HERE", 12 "database": "kafka", 13 "collection": "sourceStats", 14 "mongo.errors.log.enable": true, 15 "transforms": "InsertField", 16 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", 17 "transforms.InsertField.timestamp.field": "timestampColumn" 18 }}' http://localhost:8083/connectors -w "\n"
1 curl https://gist.githubusercontent.com/RWaltersMA/011f1473cf937badc61b752a6ab769d4/raw/bc180b9c2db533536e6c65f34c30b2d2145872f9/mongodb-source-performance.chart -o source-performance.charts
Choose Import Dashboard from the Add Dashboard dropdown and select the downloaded file.
You will need to create a Datasource to the new sink collection, “kafka.sourceStats.”
Click on the Kafka Performance Source chart to view the statistics.
In the above example, you can see the 10-second sliding window performance statistics for 1.5M documents. The average difference was 252s, with the maximum difference being 480s. Note that some of this delta could be differences in clocks between MongoDB and Kafka. While not taking these numbers as absolute, simply using this technique is good enough to determine trends and if the performance is getting worse or better.
If you have any opinions on features or functionality enhancements that you would like to see with respect to monitoring performance or monitoring the MongoDB Connector for Apache Kafka in general, please add a comment to KAFKA-64.