Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Connectors
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Connectorschevron-right

Measuring MongoDB Kafka Connector Performance

Juan Soto, Robert Walters6 min read • Published Feb 15, 2022 • Updated May 09, 2022
Connectors
Facebook Icontwitter iconlinkedin icon
Rate this article
star-empty
star-empty
star-empty
star-empty
star-empty
With today’s need of flexible event-driven architectures, companies across the globe choose best of breed technologies like MongoDB and Apache Kafka to help solve these challenges. While these two complementary technologies provide the power and flexibility to solve these large scale challenges, performance has always been at the forefront of concerns. In this blog, we will cover how to measure performance of the MongoDB Connector for Apache Kafka in both a source and sink configuration.

Measuring Sink Performance

Recall that the MongoDB sink connector writes data from a Kafka topic into MongoDB. Writes by default use the ReplaceOneModel where the data is either updated if it's present on the destination cluster or created as a new document if it is not present. You are not limited to this upsert behavior. In fact, you can change the sink to perform deletes or inserts only. These write behaviors are defined by the Write Model Strategy setting in the sink configuration.
To determine the performance of the sink connector, we need a timestamp of when the document was written to MongoDB. Currently, the only write model strategy that writes a timestamp field on behalf of the user is UpdateOneTimestampsStrategy and UpdateOneBusinessKeyTimestampStrategy. These two write models insert a new field named _insertedTS, which can be used to query the lag between Kafka and MongoDB.
In this example, we’ll use MongoDB Atlas. MongoDB Atlas is a public cloud MongoDB data platform providing out-of-the-box capabilities such as MongoDB Charts, a tool to create visual representations of your MongoDB data. If you wish to follow along, you can create a free forever tier.

Generate Sample Data

We will generate sample data using the datagen Kafka Connector provided by Confluent. Datagen is a convenient way of creating test data in the Kafka ecosystem. There are a few quickstart schema specifications bundled with this connector. We will use a quickstart called users.
1curl -X POST -H "Content-Type: application/json" --data '
2 {"name": "datagen-users",
3 "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
4 "kafka.topic": "topic333",
5 "quickstart": "users",
6 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
7 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
8 "value.converter.schemas.enable": "false",
9 "max.interval": 50,
10 "iterations": 5000,
11 "tasks.max": "2"
12}}' http://localhost:8083/connectors -w "\n"

Configure Sink Connector

Now that the data is generated and written to the Kafka topic, “topic333,” let’s create our MongoDB sink connector to write this topic data into MongoDB Atlas. As stated earlier, we will add a field _insertedTS for use in calculating the lag between the message timestamp and this value. To perform the insert, let’s use the UpdateOneTimestampsStrategy write mode strategy.
1curl -X POST -H "Content-Type: application/json" --data '
2{"name": "kafkametadata3",
3 "config": {
4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
5 "topics": "topic333",
6 "connection.uri": "MONGODB CONNECTION STRING GOES HERE",
7 "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
8 "database": "kafka",
9 "collection": "datagen",
10 "errors.log.include.messages": true,
11 "errors.deadletterqueue.context.headers.enable": true,
12 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
13 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
14 "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy",
15 "tasks.max": 2,
16 "value.converter.schemas.enable":false,
17 "transforms": "InsertField",
18 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
19 "transforms.InsertField.offset.field": "offsetColumn",
20 "transforms": "InsertField",
21 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
22 "transforms.InsertField.timestamp.field": "timestampColumn"
23}}' http://localhost:8083/connectors -w "\n"
Note: The field _insertedTS is populated with the time value of the Kafka connect server.

Viewing Results with MongoDB Charts

Take a look at the MongoDB Atlas collection “datagen” and familiarize yourself with the added fields.
Figure 1: Datagen collection as seen in MongoDB Atlas Collections page
In this blog, we will use MongoDB Charts to display a performance graph. To make it easy to build the chart, we will create a view.
1use kafka
2db.createView("SinkView","datagen",
3[
4 {
5 "$sort" : {
6 "_insertedTS" : 1,
7 "timestampColumn" : 1
8 }
9 },
10 {
11 "$project" : {
12 "_insertedTS" : 1,
13 "timestampColumn" : 1,
14 "_id" : 0
15 }
16 },
17 {
18 "$addFields" : {
19 "diff" : {
20 "$subtract" : [
21 "$_insertedTS",
22 {
23 "$convert" : {
24 "input" : "$timestampColumn",
25 "to" : "date"
26 }
27 }
28 ]
29 }
30 }
31 }
32])
To create a chart, click on the Charts tab in MongoDB Atlas:
Click on Datasources and “Add Data Source.” The dialog will show the view that was created.
Select the SinkView and click Finish.
Download the MongoDB Sink performance Chart from Gist. ​​
1curl https://gist.githubusercontent.com/RWaltersMA/555b5f17791ecb58e6e683c54bafd381/raw/748301bcb7ae725af4051d40b2e17a8882ef2631/sink-chart-performance.charts -o sink-performance.charts
Choose Import Dashbaord from the Add Dashboard dropdown and select the downloaded file.
Load the sink-perfromance.chart file.
Select the kafka.SinkView as the data source at the destination then click Save.
Now the KafkaPerformance chart is ready to view. When you click on the chart, you will see something like the following:
This chart shows statistics on the differences between the timestamp in the Kafka topic and Kafka connector. In the above example, the maximum time delta is approximately one second (997ms) from inserting 40,000 documents.

Measuring Source Performance

To measure the source, we will take a different approach using KSQL to create a stream of the clusterTime timestamp from the MongoDB change stream and the time the row was written in the Kafka topic. From here, we can push this data into a MongoDB sink and display the results in a MongoDB Chart.

Configure Source Connector

The first step will be to create the MongoDB Source connector that will be used to push data onto the Kafka topic.
1curl -X POST -H "Content-Type: application/json" --data '
2{"name": "mongo-source-perf",
3 "config": {
4 "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
5 "errors.log.enable": "true",
6 "errors.log.include.messages": "true",
7 "connection.uri": "mongodb+srv://MONGODB CONNECTION STRING HERE",
8 "database": "kafka",
9 "collection": "source-perf-test",
10 "mongo.errors.log.enable": "true",
11 "topic.prefix":"mdb",
12 "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
13 "output.format.value":"schema",
14 "output.schema.infer.value":true,
15 "output.format.key":"json",
16 "publish.full.document.only": "false",
17 "change.stream.full.document": "updateLookup"
18}}' http://localhost:8083/connectors -w "\n"

Generate Sample Data

There are many ways to generate sample data on MongoDB. In this blog post, we will use the doc-gen tool (Github repo) to quickly create sample documents based upon the user’s schema, which is defined as follows:
1{
2 "_id" : ObjectId("59b99db4cfa9a34dcd7885b6"),
3 "name" : "Ned Stark",
4 "email" : "sean_bean@gameofthron.es",
5 "password" : "$2b$12$UREFwsRUoyF0CRqGNK0LzO0HM/jLhgUCNNIJ9RJAqMUQ74crlJ1Vu"
6}
To generate data in your MongoDB cluster, issue the following:
1docker run robwma/doc-gen:1.0 python doc-gen.py -s '{"name":"string","email":"string","password":"string"}' -c "MONGODB CONNECTION STRING GOES HERE" -t 1000 -db "kafka" -col "source-perf-test"

Create KSQL Queries

Launch KSQL and create a stream of the clusterTime within the message.
Note: If you do not have KSQL, you can run it as part of the Confluent Platform all in Docker using the following instructions.
If using Control Center, click ksQLDB, click Editor, and then paste in the following KSQL:
1CREATE STREAM stats (
2 clusterTime BIGINT
3 ) WITH (
4 KAFKA_TOPIC='kafka.source-perf-test',
5 VALUE_FORMAT='AVRO'
6 );
The only information that we need from the message is the clusterTime. This value is provided within the change stream event. For reference, this is a sample event from change streams.
1{
2 _id: { <BSON Object> },
3 "operationType": "<operation>",
4 "fullDocument": { <document> },
5 "ns": {
6 "db": <database>,
7 "coll": <collection>
8 },
9 "to": {
10 "db": <database>,
11 "coll": <collection>
12 },
13 "documentKey": {
14 _id: <value>
15 },
16 "updateDescription": {
17 "updatedFields": { <document> },
18 "removedFields": [ <field>, ... ]
19 },
20 "clusterTime": <Timestamp>,
21 "txnNumber": <NumberLong>,
22 "lsid": {
23 "id": <UUID>,
24 "uid": <BinData>
25 }
26}
Step 3
Next, we will create a ksql stream that calculates the difference between the cluster time (time when it was created on MongoDB) and the time where it was inserted on the broker.
1CREATE STREAM STATS2 AS
2 select ROWTIME - CLUSTERTIME as diff, 1 AS ROW from STATS EMIT CHANGES;
As stated previously, this diff value may not be completely accurate if the clocks on Kafka and MongoDB are different.
Step 4
To see how the values change over time, we can use a window function and write the results to a table which can then be written into MongoDB via a sink connector.
1SET 'ksql.suppress.enabled' = 'true';
2
3CREATE TABLE STATSWINDOW2 AS
4 SELECT AVG( DIFF ) AS AVG, MAX(DIFF) AS MAX, count(*) AS COUNT, ROW FROM STATS2
5 WINDOW TUMBLING (SIZE 10 SECONDS)
6 GROUP BY ROW
7 EMIT FINAL;
Windowing lets you control how to group records that have the same key for stateful operations, such as aggregations or joins into so-called windows. There are three ways to define time windows in ksqlDB: hopping windows, tumbling windows, and session windows. In this example, we will use tumbling as it is a fixed-duration, non-overlapping, and gap-less window.

Configure Sink Connector

The final step is to create a sink connector to insert all this aggregate data on MongoDB.
1curl -X POST -H "Content-Type: application/json" --data '
2{
3 "name": "MongoSource-SinkPerf",
4 "config": {
5 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
6 "tasks.max": "1",
7 "errors.log.enable": true,
8 "errors.log.include.messages": true,
9 "topics": "STATSWINDOW2",
10 "errors.deadletterqueue.context.headers.enable": true,
11 "connection.uri": "MONGODB CONNECTION STRING GOES HERE",
12 "database": "kafka",
13 "collection": "sourceStats",
14 "mongo.errors.log.enable": true,
15 "transforms": "InsertField",
16 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
17 "transforms.InsertField.timestamp.field": "timestampColumn"
18}}' http://localhost:8083/connectors -w "\n"

Viewing Results with MongoDB Charts

Download the MongoDB Source performance Chart from Gist. ​​
1curl https://gist.githubusercontent.com/RWaltersMA/011f1473cf937badc61b752a6ab769d4/raw/bc180b9c2db533536e6c65f34c30b2d2145872f9/mongodb-source-performance.chart -o source-performance.charts
Choose Import Dashboard from the Add Dashboard dropdown and select the downloaded file.
You will need to create a Datasource to the new sink collection, “kafka.sourceStats.”
Click on the Kafka Performance Source chart to view the statistics.
In the above example, you can see the 10-second sliding window performance statistics for 1.5M documents. The average difference was 252s, with the maximum difference being 480s. Note that some of this delta could be differences in clocks between MongoDB and Kafka. While not taking these numbers as absolute, simply using this technique is good enough to determine trends and if the performance is getting worse or better.
If you have any opinions on features or functionality enhancements that you would like to see with respect to monitoring performance or monitoring the MongoDB Connector for Apache Kafka in general, please add a comment to KAFKA-64.
Have any questions? Check out our Connectors and Integrations MongoDB community forum.

Facebook Icontwitter iconlinkedin icon
Rate this article
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

How to Get Started with MongoDB Atlas Stream Processing and the HashiCorp Terraform MongoDB Atlas Provider


May 20, 2024 | 5 min read
Tutorial

Using AWS IAM Authentication with the MongoDB Connector for Apache Kafka


Jul 01, 2024 | 4 min read
Tutorial

Deploying MongoDB Across Multiple Kubernetes Clusters With MongoDBMulti


Sep 05, 2023 | 11 min read
Article

Learn How to Leverage MongoDB Data Within Kafka with New Tutorials!


Sep 17, 2024 | 1 min read
Table of Contents
  • Measuring Sink Performance