I have the following connect configuration sink connector configuration.
"mongodb.delete.on.null.values": "true",
"delete.on.null.values": "true",
"document.id.strategy.overwrite.existing": "true",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
"transforms": "hk",
"transforms.hk.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.hk.field": "_id",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector",
"key.projection.type": "AllowList",
"key.projection.list": "_id",
"value.projection.type": "AllowList",
"value.projection.list": "recommended_listings_prediction_current,shopper_engagement,cdp_id,last_modified_ts"
But when it runs I get the following stack trace error.
{
"exception": {
"stacktrace": "org.apache.kafka.connect.errors.DataException: Could not build the WriteModel,the `_id` field was missing unexpectedly\n\tat com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy.createWriteModel(ReplaceOneDefaultStrategy.java:50)\n\tat com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy.createWriteModel(DefaultWriteModelStrategy.java:36)\n\tat com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategyHelper.createValueWriteModel(WriteModelStrategyHelper.java:44)\n\tat com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategyHelper.createWriteModel(WriteModelStrategyHelper.java:33)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:92)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)\n\tat com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)\n\tat com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)",
"exception_class": "org.apache.kafka.connect.errors.DataException",
"exception_message": "Could not build the WriteModel,the `_id` field was missing unexpectedly"
},
"source_host": "ip-172-28-195-18.ec2.internal",
"method": "tryProcess",
"level": "ERROR",
"message": "Unable to process record SinkRecord{kafkaOffset=25529965, timestampType=CreateTime} ConnectRecord{topic='dev-feature.consumer_profile', kafkaPartition=2, key=Struct{_id=adefffa6-b9ae-40cf-b6de-429c59e4f2c5}, keySchema=Schema{STRUCT}, value={last_modified_ts=1677466685723, research_indicator_30_mins={windowstart=1663612200000, windowend=1663614000000, value=false}, review_indicator_30_mins={windowstart=1663612200000, windowend=1663614000000, value=false}, popup_indicator_30_mins={windowstart=1663612200000, windowend=1663614000000, value=false}, imps_news_page_30_mins={windowstart=1663612200000, windowend=1663614000000, value=[]}, dealer_profile_indicator_30_mins={windowstart=1663612200000, windowend=1663614000000, value=false}, cdp_id=adefffa6-b9ae-40cf-b6de-429c59e4f2c5, imp_mm_research_page_30_min={windowstart=1663612200000, windowend=1663614000000, value=[]}}, valueSchema=null, timestamp=1663613512730, headers=ConnectHeaders(headers=[ConnectHeader(key=x-datadog-trace-id, value=7756784277501315046, schema=Schema{INT64}), ConnectHeader(key=x-datadog-parent-id, value=1083778841350792478, schema=Schema{INT64}), ConnectHeader(key=x-datadog-sampling-priority, value=1, schema=Schema{INT8})])}",
"mdc": {
"connector.context": "[dev-rtff-cdc-sink|task-1] "
},
"@timestamp": "2023-02-27T20:04:12.823Z",
"file": "MongoProcessedSinkRecordData.java",
"line_number": "109",
"thread_name": "task-thread-dev-rtff-cdc-sink-1",
"@version": 1,
"logger_name": "com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData",
"class": "com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData"
}
When I run the configuration without the key and value projection, it works fine. Has anyone had any success running this combination of processor chain with the transform?