The Atlas Stream Processing Set-up Guide for Kafka Connector Users
Robert Walters15 min read • Published Aug 23, 2024 • Updated Aug 23, 2024
Rate this tutorial
MongoDB offers two primary interfaces— the MongoDB Connector for Kafka (Kafka Connector) and Atlas Stream Processing—for integrating your operational data on MongoDB with streaming data from Apache Kafka. This makes for a powerful combination when building modern, event-driven applications.
This post will cover:
- The basics of setting up Atlas Stream Processing
- How to connect to a Kafka topic and stream data using Atlas Stream Processing
- How to transform data with Atlas Stream Processing
- How to stream data to MongoDB Atlas
- A deep-dive into the Kafka ecosystem
- Core components
- Equivalent Kafka Connector configuration parameters that are available to you in Atlas Stream Processing
- Current limitations of Atlas Stream Processing
Prerequisites:
- MongoDB Atlas Account
- An Apache Kafka Source (In this example, we’ll use Redpanda)
- An Atlas Database
Note: This article will be updated frequently as we continue development of Atlas Stream Processing.
Apache Kafka Connect is an optional server component in an Apache Kafka deployment. This component makes streaming data between Apache Kafka and other data systems easier by providing data source-specific connectors, such as the MongoDB Connector for Apache Kafka. These connectors can connect to their respective data sources and read/write data to an Apache Kafka topic via the Kafka Connect framework.
The deployment of Kafka Connect can be complicated and costly, as it involves multiple servers supporting a distributed mode that enables the scalability and availability of connector workers. Cloud providers make hosting this infrastructure easier by providing Kafka Connect as a service. This simplifies connector management, but it also means a high per-hour cost to run the infrastructure.
In many scenarios, with MongoDB Atlas Stream Processing, you no longer need to leverage Kafka Connect when moving data to and from Apache Kafka and MongoDB. Atlas Stream Processing is a feature within MongoDB Atlas that enables continually processing, validating, and merging streaming data from sources such as Apache Kafka and MongoDB Change Streams. Atlas Stream Processing also makes reading and writing data between Apache Kafka and MongoDB simple without needing Kafka Connect or the MongoDB Connector for Apache Kafka.
In this article, we will compare and contrast the methods for moving data between Kafka and MongoDB using an example data flow. We will start with source data in Apache Kafka and process it using a window function. Then, we’ll land the data in MongoDB.
First, let’s use the MongoDB Connector for Apache Kafka. Then, we’ll set things up in Atlas Stream Processing without relying on Kafka Connect or any extra infrastructure, saving us infrastructure costs and management time.
You’ll need access to an Apache Kafka topic with Apache Kafka Connect and the MongoDB Connector for Apache Kafka installed. Note: If you do not have access to an environment, you can use the Docker compose script available in the MongoDB Kafka Connector source GitHub repository. Since the data resides in the Kafka topic, we define the MongoDB Connector for Apache Kafka as a sink with the following definition:
1 { 2 "name": "mongo-tutorial-sink", 3 "config": { 4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", 5 "topics": "Stocks", 6 "connection.uri": "mongodb://mongo1", 7 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 8 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 9 "value.converter.schemas.enable": false, 10 "database": "Stocks", 11 "collection": "StockData", 12 "mongo.errors.tolerance":"data", 13 "errors.tolerance":"all", 14 "errors.deadletterqueue.topic.name":"mongoerror", 15 "errors.deadletterqueue.topic.replication.factor":1 16 } 17 }
To transform your data, you’ll need to use another service such as Apache Flink to stream the output of the window function to another topic. Setting this up and configuring the necessary infrastructure is outside the scope of this article. However, you can imagine there are many moving parts to get this working.
Next, let’s take a look at MongoDB Atlas Stream Processing.
This set up is intentionally high-level. For a step-by-step tutorial that includes additional details like security configuration, see the Get Started with Atlas Stream Processing tutorial in our documentation.
Using the MongoDB Atlas UI, log in to your project, click “Stream Processing” from the services menu, and click the “Create Instance” button. Provide a name, “KafkaTestSPI,” and click the “Create” button.
Note: For this walkthrough, you can choose any available Stream Processing Instance (SPI) tier. Billing begins once you have running stream processor jobs, not when creating SPIs. See the documentation for more information.
Once you have created an SPI, click “Configure” and then “Connection Registry.” Here, you will define the connections that Atlas Stream Processing will use, such as connections to Kafka brokers or other Atlas clusters within your Atlas project.
Next, click “Create Connection” and select “Kafka.”
Atlas Stream Processing uses the Kafka Wire Protocol and can connect to any Kafka-compliant service, such as Confluent Cloud, Redpanda Cloud, AWS MSK, or Azure Event Hubs. We support SASL_PLAINTEXT and SASL_SSL security configurations.
Once we have added the connection to Kafka, create another connection and select “Atlas Database.” Give it a connection name and select any Atlas cluster you have available in your current project.
Now that we have defined a connection to our Kafka deployment and a connection to our Atlas cluster, we can connect to the Stream Processing instance and create stream processors.
Unlike the MongoDB Connector for Apache Kafka, no configuration files define source and sink specifically. Atlas Stream Processing uses connection registry connections for both cases. The behavior depends on whether the connection is used in a $source or $merge pipeline operator. If the connection is used as a $source, it reads data from the connection into the pipeline. If Atlas Stream Processing is writing data out to the connection, it uses $emit or $merge depending on if it's writing to Kafka or MongoDB, respectively. To demonstrate this, let’s continue the example, creating a stream processor that will read data from ($source) our Kafka hosted topic and write data ($merge) into our Atlas cluster.
Use the Connect button to connect to the “KafkaTestSPI” SPI that we created earlier. This will launch a familiar connection dialog, as shown below.
Using a database user account, use the MongoDB Shell (mongosh) to connect to the SPI. Once connected, we can build the stream processor by defining variables for each stage in the pipeline, ultimately combining them when we’re ready to create the stream processor. Our Kafka topic will be populated with fictitious stock data in the form of the following value:
1 { 2 "exchange":"NASDAQ" 3 "company_symbol":"AHL" 4 "company_name":"AMUSED HAWK LABS" 5 "price":8.41 6 "tx_time":"2024-04-03T14:51:40Z" 7 }
Let’s create the $source variable for our HostedKafka connection.
1 s_kafka = { $source: { connectionName: "HostedKafka", "topic": "Stocks" } }
Next, let’s create a tumbling window over every five seconds of data.
1 tw = { $tumblingWindow: { interval: { size: 5, unit: "second" }, pipeline: [{ $group: { _id: "$company_symbol", max: { $max: "$price" }, avg: { $avg: "$price" } } }] } }
Note: To get an idea of what the stream of data would look like, we can use the .process command, passing each stage of the pipeline and displaying the results to the console.
1 sp.process([s_kafka,tw])
Next, let’s write this stream of data into the Atlas cluster we defined in the connection registry.
1 m={$merge: { into: { connectionName: "AtlasSource",db: "Stock",coll: "StockReport" }}}
Finally, let’s create the stream processor using the createStreamProcessor command.
1 sp.createStreamProcessor(‘mytestsp’,[s_kafka,tw,m])
Then we will start the stream processor by issuing the .start command.
1 mytestsp.start()
You can view statistics on your stream processor by using the .stats command.
1 mytestsp.stats()
Here is an example of the return result:
1 { 2 ok: 1, 3 ns: '66744a49d8619c0695a7e5e9.5e98cd46cd7309236ceaab3c.testsp', 4 stats: { 5 name: 'mytestsp', 6 processorId: '66780d901f7c0b19f89f559f', 7 status: 'running', 8 scaleFactor: Long('1'), 9 inputMessageCount: Long('633972'), 10 inputMessageSize: 280913520, 11 outputMessageCount: Long('296963'), 12 outputMessageSize: 475437763, 13 dlqMessageCount: Long('0'), 14 dlqMessageSize: 0, 15 stateSize: 3328, 16 memoryTrackerBytes: 33554432, 17 watermark: ISODate('2024-08-07T20:49:43.047Z'), 18 ok: 1 19 }, 20 pipeline: [ 21 { 22 '$source': { 23 connectionName: ‘HostedKafka’, 24 Topic: ‘Stocks’ 25 } 26 }, 27 { 28 '$tumblingWindow': { 29 interval: { size: 5, unit: 'second' }, 30 pipeline: [ 31 { 32 '$group': { 33 _id: [Object], 34 max: [Object], 35 avg: [Object] 36 } 37 } 38 ] 39 } 40 }, 41 { 42 '$merge': { 43 into: { 44 connectionName: 'AtlasSource', 45 db: 'Stock', 46 coll: 'StockReport' 47 } 48 } 49 } 50 ] 51 }
That’s it for moving data from Kafka to MongoDB! Moving data from MongoDB to Kafka is just as simple. Simply define a $source for your Atlas cluster and use $emit for your Kafka Connection to quickly stream data with no complex Kafka Connect deployment or configuration and lower cost.
Next, we’ll cover in-depth considerations and options available to you in more advanced Kafka configurations. Let’s start with some context about the Apache Kafka ecosystem.
This section highlights some of the key Kafka components to keep in mind when setting up an integration between Kafka and MongoDB, but it is not intended to provide a comprehensive list of components. We have included components most relevant to stream processing use cases, but going through a full list is beyond the scope of this article. Refer to Kafka and/or Kafka Connect documentation for anything we haven’t covered.
The Schema Registry is an optional component of Apache Kafka that provides a centralized repository for managing and validating schemas used in data processing and serialization. Since everything in Apache Kafka is binary, developers use the schema registry to ensure conformity of data structure, data validation, compatibility checking, versioning, and evolution, all of which are useful for downstream applications. Atlas Stream Processing does not interact with the Schema Registry at the time of this writing. Please let us know via UserVoice if you have a strong need for this.
Kafka Connect offers a method for integrating data between Kafka and external systems such as MongoDB. The MongoDB Connector for Apache Kafka is built on this method of integration. With Atlas Stream Processing, there is no need to use Kafka Connect to move data between Kafka and MongoDB.
At the time of this writing, Atlas Stream Processing only supports the JSON data format. When additional formats are available, this article will be updated accordingly. If you have an urgent need, please let us know via UserVoice.
The Kafka Connector has settings to determine how to deal with errors. Configuration parameters such as
mongo.errors.tolerance
and errors.tolerance
can be used alongside their corresponding Dead Letter Queue (DLQ) configuration parameters errors.deadletterqueue.topic.name
and errors.deadletterqueue.topic.replication.factor
. This gives developers the ability to stop processing when an error occurs, tolerate errors without stopping the processing of messages, or write errors to a DLQ (either a Kafka topic or Atlas collection, depending on if it’s a source or sink connector). Similarly, Atlas Stream Processing supports writing errors to a DLQ using a MongoDB Atlas collection. To do this, users simply use the $validate
stage to send events that do not conform to a specific schema to the DLQ.SMTs are part of Kafka Connect and transform inbound messages after a source connector has produced them and before they are written to Kafka. SMTs are also used at the sink, transforming outbound messages before sending them to the Kafka Connector. Rather than use SMTs, Atlas Stream Processing uses the MongoDB Query API and aggregation framework to transform documents. When setting up a stream processor, note that Atlas Stream Processing does not yet support custom functions to transform messages. The table below describes the equivalent aggregation pipeline operator or procedure to use for each of the built-in SMTs.
Transform | Description | Atlas Stream Processing Support |
---|---|---|
Cast | Cast fields or the entire key or value to a specific type | Support coming soon via $convert . |
Drop | Drop either a key or a value from a record and set it to null. | Use the $set operator: { $set: { _stream.meta.key: null } }, |
DropHeaders | Drop one or more headers from each record. | Use the $pull operator: { $pull: { _stream.meta.headers: { k: 'header2' } } } |
EventRouter | Route Debezium outbox events using a regex configuration option. | Not supported. |
ExtractField | Extract the specified field from a struct when schema present, or a map in the case of schemaless data. Any null values are passed through unmodified. | Atlas Stream Processing can extract just the value from a field using $project , but it will still be in JSON format with a field name. |
ExtractTopic | Replace the record topic with a new topic derived from its key or value. | Limited support. You can assign a topic when writing back to Kafka via $emit pipeline operator. The $emit allows for an explicit topic name, a reference to a value in the document, or an expression. |
Filter (Apache Kafka) | Drop all records. Designed to be used in conjunction with a predicate. | Use $match and $project to filter values from the stream. |
Flatten | Flatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character. | Use $addFields with references to the nested field you desire to flatten. If you wish to use a delimiter like a period, simply concatenate using $concat as follows: { $addFields: { fullName: { $concat: ["$name", ".", "$lastname"] } } } |
GzipDecompress | Gzip-decompress the entire byteArray key or value input. | Not supported. |
HeaderFrom | Moves or copies fields in a record key or value into the record’s header. | Use $emit.config.headers to specify the Kafka header. Note: you can reference record keys or values by using the $ symbol. This is used to denote field paths and expressions within aggregation pipeline stages and queries. Example: $emit.config.headers = "$foo" |
HoistField | Wrap data using the specified field name in a struct when schema present, or a map in the case of schemaless data. | Currently, event data must be in JSON format. |
InsertField | Insert field using attributes from the record metadata or a configured static value. | Use $addFields or $project to create a new field. |
InsertHeader | Insert a literal value as a record header. | Use $emit.config.headers . Value must resolve to a document or array structured like [{k': .., 'v': …}, {'k': …, 'v': …}] . |
MaskField | Mask specified fields with a valid null value for the field type. | Use $set to provide a null or any arbitrary value to a field. |
MessageTimeStampRouter | Update the record’s topic field as a function of the original topic value and the record’s timestamp field. | Not supported. |
RegexRouter | Update the record topic using the configured regular expression and replacement string. | You can modify the topic names by specifying the topic name by string or regex in the $emit operator. |
ReplaceField | Filter or rename fields. | To drop a field, use $unset or $project . To rename a field, use $project. |
SetSchemaMetadata | Set the schema name, version, or both on the record’s key or value schema. | Not supported. |
TimestampConverter | Convert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types. | Convert date and time using aggregation operators $dateFromString and Timestamp. |
TimestampRouter | Update the record’s topic field as a function of the original topic value and the record timestamp. | Not supported. |
TopicRegexRouter | Update the record topic using the configured regular expression and replacement string. | You can modify the topic names by specifying the topic name by string or regex in the $emit operator. |
ValueToKey | Replace the record key with a new key formed from a subset of fields in the record value. | Specify the key by assigning it in $emit . For example: $emit.config.key: "$foo" |
Next, let’s briefly summarize source and sink properties, post-processors, and write model strategies.
When configuring the Kafka Connector as a source, it reads data from a MongoDB cluster and writes it into an Apache Kafka topic. Similar to the Kafka Connector, when using Atlas Stream Processing, developers first define a connection registry connection to a MongoDB Atlas cluster and another connection registry connection to their Kafka topic. Next, they use
$source
to read the data from the MongoDB cluster, and perform any optional transformations or aggregations on the data, before using $emit
to write the data into their Kafka topic.Using the Kafka Connector as a sink, developers specifically define the connection as a sink using the connector class
com.mongodb.kafka.connect.MongoSinkConnector
. With Atlas Stream Processing, you define a source in the connection registry for use in both sources and sinks. When writing to a Kafka topic “sink,” you use $emit
, and when writing to a MongoDB “sink,” you use $merge
.Post processors are used in the Kafka Connector to modify the document before writing it out to a MongoDB cluster sink. In Atlas Stream Processing, you can modify the document using the MongoDB aggregation pipeline stages before writing it to a MongoDB Atlas. There is no need to write your own Java code to perform business-specific transformations.
For example, assume the following event is on the Kafka topic:
1 { 2 device_id: 'device_0', 3 group_id: 8, 4 timestamp: '2024-04-04T14:49:52.943+00:00', 5 max_watts: 250, 6 event_type: 0, 7 obs: { 8 watts: 75, 9 temp: 5 10 }, 11 _ts: ISODate('2024-04-04T14:49:52.943Z') 12 }
When you use the allow list projector within the Kafka Connector, the post-processor only outputs data from the fields that you specify.
1 post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector 2 value.projection.type=AllowList 3 value.projection.list=device_id,obs.watts,obs.temp
In this example, the output would be the following:
1 { 2 device_id: 'device_0', 3 obs: { 4 watts: 75, 5 temp: 5 6 } 7 }
Using Atlas Stream Processing, simply add a $project stage in your query to include/exclude fields.
1 {$project:{"device_id":1,"obs.watts":1, "obs.temp":1}}
The output would be the following:
1 { 2 device_id: 'device_1', 3 obs: { 4 Watts: 50, 5 temp: 10 6 } 7 }
The Kafka Connector contains built-in post-processors and processors that determine how to handle the _id field in the destination MongoDB. _id is determined by the DocumentIdAddr post-processor.
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
This post processor inserts an _id field determined by the configured strategy. The default strategy is BsonOidStrategy. By default, an _id will be added to the document if one does not already exist.
DocumentIdAddr strategies
DocumentId strategies are defined via document.id.strategy. These are the possible configurations and the Atlas Stream Processing equivalent. Note that with Atlas Stream Processing, to add or change the _id field, simply use the $addFields pipeline operator to add a new “_id” field with the value specified in the table below.
Strategy Name | Description | Atlas Stream Processing Equivalent |
---|---|---|
BsonOidStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy | Default | A specific _id field does not need to exist in the document as a new _id with an ObjectId will be included by default. |
KafkaMetaDataStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy | Builds a string composed of the concatenation of Kafka topic, partition, and offset. | { $addFields: { "_id": "$_stream_meta.source.topic"}} |
FullKeyStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy | Uses the complete key structure to generate the value for the _id field. | { $addFields: { "_id": "$_stream_meta.source.key"}} |
ProvidedInKeyStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy | Uses the _id field specified in the key structure. | { $addFields: { "_id": "$_stream_meta.source.key._id"}} |
ProvidedInValueStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy | Uses the _id field specified in the value structure of the sink document. | Not applicable as when Kafka data is sourced into Atlas Stream Processing, the value is the document itself. Thus, an _id within the document would be used. |
PartialKeyStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy | Uses a block list or allow list projection of the sink document key structure. | To use a different field name for the _id referencing one that originally came in as the key, use $addFields as follows: { $addFields: { "_id": "$_stream_meta.source.key.$your_desired_id_field"}} |
PartialValueStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy | Uses a block list or allow list projection of the sink document value structure. | To use a different field name for the _id , use $addFields as follows: { $addFields: { "_id": "$your_desired_id_field"}} |
UuidProvidedInKeyStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.UuidProvidedInKeyStrategy | Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format. | Not supported. |
UuidProvidedInValueStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.UuidProvidedInValueStrategy | Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format. | Not supported. |
UuidStrategy com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy | Uses a randomly generated UUID in string format. | Not supported. |
Custom user-defined id strategies are not supported.
The following is a list of built-in Kafka Connector post-processors and their Atlas Stream Processing equivalents:
Post Processor Name | Description | Atlas Stream Processing Equivalent |
---|---|---|
BlockListKeyProjector com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector | Removes matching key fields from the sink record. | Modify the $_stream_meta.source.key directly using $set . |
BlockListValueProjector com.mongodb.kafka.connect.sink.processor.BlockListValueProjector | Removes matching value fields from the sink record. | Use $project to remove any fields from the document. |
AllowListKeyProjector com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector | Includes only matching key fields from the sink record. | Modify the $_stream_meta.source.key directly using $set . |
AllowListValueProjector com.mongodb.kafka.connect.sink.processor.AllowListValueProjector | Includes only matching value fields from the sink record. | Use $project to show any fields from the document. |
KafkaMetaAdder com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder | Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document. | Use $set to add the field: { $set: { "topic-partition-offset": { $concat: ["$_stream_meta.source.topic", "#", "$_stream_meta.source.partition", "#", "$_stream_meta.source.offset"] } } } |
RenameByMapping com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping | Renames fields that are an exact match to a specified field name in the key or value document. | Use $project to rename fields. Reference field names using the $ operator. |
RenameByRegex com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex | Renames fields that match a regular expression in the key or value document. | Use $project to rename fields. |
A write model strategy in MongoDB Kafka Connector is a custom Java class that defines how a sink connector writes data using write models. There are three strategies available out of the box with the Kafka Connector: ReplaceOneBusinessKeyStrategy, DeleteOneBusinessKeyStrategy, and UpdateOneBusinessKeyTimestampStrategy. Details of using these strategies are as follows:
Write Strategy | Description | Atlas Stream Processing Equivalent |
---|---|---|
UpdateOneBusinessKeyTimestamp | Adds _insertedTS and _modifiedTS fields to the sink. | Use $set to add a date to _insertedTS or _modifiedTS Example: { $set: { _modifiedTS: new Date() } } |
DeleteOneBusinessKeyStrategy | Deletes the document based upon a value in the event. | Not supported. |
ReplaceOneBusinessKeyStrategy | Replaces documents that match the value of the business key. | Not supported. |
In this article, we covered everything from a comparison of the Kafka Connector to Atlas Stream Processing, to the basics of setting up Atlas Stream Processing using a Kafka source (in this case, Redpanda), closing with some of the nuances and details to keep in mind when integrating Apache Kafka and MongoDB. There’s a lot to Apache Kafka and the ecosystem around it, so we hope this article helps you implement the solution that works best for your use case and needs. Atlas Stream Processing is a fully managed service integrated into MongoDB Atlas that we built to prioritize ease of use, cost, and performance when processing data between Kafka and MongoDB. While not intended as a direct replacement for the Kafka Connector, Atlas Stream Processing is proving a valuable alternative for customers and we are investing heavily in more features and functionality to support your workloads.
Ready to get started? Log in today!
Can’t find a configuration parameter or capability you need? Please let us know in UserVoice.
Top Comments in Forums
There are no comments on this article yet.