2 / 4
May 2024

Hi all.

Got the connector installed, the source side works, which tells me I got it correctly installed,

Trying to do a sink, and the connector is failing on the Confluent side.

I’m posting below both my source and sink as examples. Confluent logs are not very helpful,

this is a local Mongodb running in a container and a local deployed Confluent stack, the one outside of containers…

I’ve also tried posting directly to Kafka topic and sinking that into a collection, same error/result, sink fails.

G

curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-source", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri":"mongodb://localhost:27017/?directConnection=true", "database":"quickstart", "collection":"sampleData", "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]" } } ' \ http://localhost:8083/connectors -w "\n"
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://localhost:27017/?directConnection=true", "database":"quickstart", "collection":"topicData", "topics":"quickstart.sampleData", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://localhost:8083/connectors -w "\n"

Below is the sink connector that takes messages posted directly onto a kafka topic, (not sourced via a source connector, fails similar to above.

curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-creator-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://localhost:27017/?directConnection=true", "database":"MongoCom0", "collection":"creator_salesbasket", "topics":"mongocreator_basket", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://localhost:8083/connectors -w "\n" `

increase the connect log level… got the below line…
not sure why it’s trying to convert the data to Avro ?

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mongocreator_basket to Avro:

I modified the sink to the below andre created it… still getting below error ?

curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-creator-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://localhost:27017/?directConnection=true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database":"MongoCom0", "collection":"creator_salesbasket", "topics":"mongocreator_basket", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://localhost:8083/connectors -w "\n"

Added the value converter above

[2024-05-20 20:13:56,031] ERROR [mongo-sink|task-0] [Consumer clientId=connector-consumer-mongo-sink-0, groupId=connect-mongo-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128) [2024-05-20 20:13:56,035] ERROR [mongo-creator-sink|task-0] [Consumer clientId=connector-consumer-mongo-creator-sink-0, groupId=connect-mongo-creator-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128) [2024-05-20 20:22:58,092] ERROR [mongo-creator-sink|task-0] WorkerSinkTask{id=mongo-creator-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:237) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:166) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:531) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:509) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:243) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:212) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mongocreator_basket to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:531) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224) ... 14 more Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:603) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:390) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:264) at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199) at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:126) ... 17 more [2024-05-20 20:22:58,581] ERROR [mongo-creator-sink|task-0] [Consumer clientId=connector-consumer-mongo-creator-sink-0, groupId=connect-mongo-creator-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128)

there is just something about battling with something and getting it working…
below is the sink connector that worked… :slight_smile:

As can be seen added a key converter and a value.converter.schemas.enabled: false

curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-creator-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://localhost:27017/?directConnection=true", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database":"MongoCom0", "collection":"creator_salesbasket", "topics":"mongocreator_basket" } } ' \ http://localhost:8083/connectors -w "\n"