--------------------- mongo-Diary-source-connector.json ---------------------
{
“name”: “mongo-Diary-source-connector”,
“config”: {
“connector.class”: “com.mongodb.kafka.connect.MongoSourceConnector”,
“tasks.max”: “1”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“connection.uri”: “mongodb+srv://id:pw@keaprofit.2ttoq3v.mongodb.net/?retryWrites=true&w=majority&appName=KEAProfit”,
“database”: “Arcadia”,
“collection”: “Diary”,
“topics”: “Arcadia.Diary”,
“poll.max.batch.size”: “1000”,
“poll.await.time.ms”: “5000”,
“name”: “mongo-Diary-source-connector”,
“copy.existing”: “true”,
“publish.full.document.only”: “true”
}
}
--------------------- mongo-Diary-sink-connector1.json ---------------------
{
“name”: “mongo-Diary-sink-connector1”,
“config”: {
“connector.class”: “com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”: “1”,
“topics”: “Arcadia.Diary”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“connection.uri”: “mongodb+srv://id:pw@keaprofit2.yyjyfhn.mongodb.net/?retryWrites=true&w=majority&appName=KEAProfit2”,
“name”: “mongo-Diary-sink-connector1”,
“database”: “Arcadia”,
“collection”: “Diary”
}
}
--------------------- mongo-Diary-sink-connector2.json ---------------------
{
“name”: “mongo-Diary-sink-connector2”,
“config”: {
“connector.class”: “com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”: “1”,
“topics”: “Arcadia.Diary”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“connection.uri”: “mongodb+srv://id:pw@keaprofit3.nmwonfl.mongodb.net/?retryWrites=true&w=majority&appName=KEAProfit3”,
“name”: “mongo-Diary-sink-connector2”,
“database”: “Arcadia”,
“collection”: “Diary”
}
}
Hello, I am Lee Min Gi, a junior developer in Korea. Please understand that I am not good at English.
Currently, I am going to make 3 db of atlas consist of 1 source and 2 sink and make cdc without any changes. However, even though I tried all the properties on the Internet, when I insert and change the document on the source, the sink changes in real time, but when I delete the document on the source, it is not deleted from the sink. When I changed “publish.full.document.only”: “true” to false, I was able to confirm that a delete message was delivered to the sink, which was also confirmed in the docker’s log.
So in conclusion, I want to use “connector.class”: “com.mongodb.kafka.connect.MongoSourceConnector” while copying the document at the same time using “publish.full.document.only”: How can I fix this by changing the connector?
Thank you for reading the long article.