Getting Started with the MongoDB Kafka Sink Connector
Follow this tutorial to learn how to configure a MongoDB Kafka sink connector to read data from an Apache Kafka topic and write it to a MongoDB collection.
Get Started with the MongoDB Kafka Sink Connector
Complete the Tutorial Setup
Complete the steps in the Kafka Connector Tutorial Setup to start the the Confluent Kafka Connect and MongoDB environment.
Configure the Sink Connector
Create an interactive shell session on the tutorial Docker Container using the following command:
docker exec -it mongo1 /bin/bash
Create a source configuration file called simplesink.json
with
the following command:
nano simplesink.json
Paste the following configuration information into the file and save your changes:
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "Tutorial2.pets", "connection.uri": "mongodb://mongo1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "Tutorial2", "collection": "pets" } }
Note
The highlighted lines in the configuration properties specify converters which instruct the connector how to translate the data from Kafka.
Run the following command in the shell to start the sink connector using the configuration file you created:
cx simplesink.json
Note
The cx
command is a custom script included in the tutorial
development environment. This script runs the following
equivalent request to the Kafka Connect REST API to create a new
connector:
curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"
Run the following command in the shell to check the status of the connectors:
status
If your sink connector started successfully, you should see the following output:
Kafka topics: ... The status of the connectors: sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector Currently configured connectors [ "mongo-tutorial-sink" ] ...
Write Data to a Kafka Topic
In the same shell, create a Python script to write data to a Kafka topic.
nano kafkawrite.py
Paste the following code into the file and save your changes:
from kafka import KafkaProducer import json from json import dumps p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8')) data = {'name': 'roscoe'} p.send('Tutorial2.pets', value = data) p.flush()
Run the Python script:
python3 kafkawrite.py
View the Data in the MongoDB Collection
In the same shell, connect to MongoDB using mongosh
, the MongoDB
shell by running the following command:
mongosh "mongodb://mongo1"
After you connect successfully, you should see the following MongoDB shell prompt:
rs0 [direct: primary] test>
At the prompt, type the following commands to retrieve all the
documents in the Tutorial2.pets
MongoDB namespace:
use Tutorial2 db.pets.find()
You should see the following document returned as the result:
{ _id: ObjectId("62659..."), name: 'roscoe' }
Exit the MongoDB shell by entering the command exit
.
(Optional) Stop the Docker Containers
After you complete this tutorial, free resources on your computer by stopping or removing Docker assets. You can choose to remove both the Docker containers and images, or exclusively the containers. If you remove the containers and images, you must download them again to restart your MongoDB Kafka Connector development environment, which is approximately 2.4 GB in size. If you exclusively remove the containers, you can reuse the images and avoid downloading most of the large files in the sample data pipeline.
Tip
More Tutorials
If you plan to complete any more MongoDB Kafka Connector tutorials, consider removing only containers. If you don't plan to complete any more MongoDB Kafka Connector tutorials, consider removing containers and images.
Select the tab that corresponds to the removal task you want to run.
Run the following shell command to remove the Docker containers and images for the development environment:
docker-compose -p mongo-kafka down --rmi all
Run the following shell command to remove the Docker containers but keep the images for the development environment:
docker-compose -p mongo-kafka down
To restart the containers, follow the same steps required to start them in the Tutorial Setup.
Summary
In this tutorial, you configured a sink connector to save data from a Kafka topic to a collection in a MongoDB cluster.
Learn More
Read the following resources to learn more about concepts mentioned in this tutorial: