georgelza
(George Leonard)
3
I modified the sink to the below andre created it… still getting below error ?
curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-creator-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://localhost:27017/?directConnection=true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database":"MongoCom0",
"collection":"creator_salesbasket",
"topics":"mongocreator_basket",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://localhost:8083/connectors -w "\n"
Added the value converter above
[2024-05-20 20:13:56,031] ERROR [mongo-sink|task-0] [Consumer clientId=connector-consumer-mongo-sink-0, groupId=connect-mongo-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128)
[2024-05-20 20:13:56,035] ERROR [mongo-creator-sink|task-0] [Consumer clientId=connector-consumer-mongo-creator-sink-0, groupId=connect-mongo-creator-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128)
[2024-05-20 20:22:58,092] ERROR [mongo-creator-sink|task-0] WorkerSinkTask{id=mongo-creator-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:237)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:166)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:531)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:509)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:243)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:212)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mongocreator_basket to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:531)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:603)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:390)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:264)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:126)
... 17 more
[2024-05-20 20:22:58,581] ERROR [mongo-creator-sink|task-0] [Consumer clientId=connector-consumer-mongo-creator-sink-0, groupId=connect-mongo-creator-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128)