Sink Connector with AllowedList and key Transform Failing on missing _id field

I have the following connect configuration sink connector configuration.

        "mongodb.delete.on.null.values": "true",
        "delete.on.null.values": "true",
        "document.id.strategy.overwrite.existing": "true",
        "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy",
        "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
        "transforms": "hk",
        "transforms.hk.type": "org.apache.kafka.connect.transforms.HoistField$Key",
        "transforms.hk.field": "_id",
        "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector",
        "key.projection.type": "AllowList",
        "key.projection.list": "_id",
        "value.projection.type": "AllowList",
        "value.projection.list": "recommended_listings_prediction_current,shopper_engagement,cdp_id,last_modified_ts"

But when it runs I get the following stack trace error.

{
    "exception": {
        "stacktrace": "org.apache.kafka.connect.errors.DataException: Could not build the WriteModel,the `_id` field was missing unexpectedly\n\tat com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy.createWriteModel(ReplaceOneDefaultStrategy.java:50)\n\tat com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy.createWriteModel(DefaultWriteModelStrategy.java:36)\n\tat com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategyHelper.createValueWriteModel(WriteModelStrategyHelper.java:44)\n\tat com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategyHelper.createWriteModel(WriteModelStrategyHelper.java:33)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:92)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)\n\tat com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)\n\tat com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)\n\tat com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)",
        "exception_class": "org.apache.kafka.connect.errors.DataException",
        "exception_message": "Could not build the WriteModel,the `_id` field was missing unexpectedly"
    },
    "source_host": "ip-172-28-195-18.ec2.internal",
    "method": "tryProcess",
    "level": "ERROR",
    "message": "Unable to process record SinkRecord{kafkaOffset=25529965, timestampType=CreateTime} ConnectRecord{topic='dev-feature.consumer_profile', kafkaPartition=2, key=Struct{_id=adefffa6-b9ae-40cf-b6de-429c59e4f2c5}, keySchema=Schema{STRUCT}, value={last_modified_ts=1677466685723, research_indicator_30_mins={windowstart=1663612200000, windowend=1663614000000, value=false}, review_indicator_30_mins={windowstart=1663612200000, windowend=1663614000000, value=false}, popup_indicator_30_mins={windowstart=1663612200000, windowend=1663614000000, value=false}, imps_news_page_30_mins={windowstart=1663612200000, windowend=1663614000000, value=[]}, dealer_profile_indicator_30_mins={windowstart=1663612200000, windowend=1663614000000, value=false}, cdp_id=adefffa6-b9ae-40cf-b6de-429c59e4f2c5, imp_mm_research_page_30_min={windowstart=1663612200000, windowend=1663614000000, value=[]}}, valueSchema=null, timestamp=1663613512730, headers=ConnectHeaders(headers=[ConnectHeader(key=x-datadog-trace-id, value=7756784277501315046, schema=Schema{INT64}), ConnectHeader(key=x-datadog-parent-id, value=1083778841350792478, schema=Schema{INT64}), ConnectHeader(key=x-datadog-sampling-priority, value=1, schema=Schema{INT8})])}",
    "mdc": {
        "connector.context": "[dev-rtff-cdc-sink|task-1] "
    },
    "@timestamp": "2023-02-27T20:04:12.823Z",
    "file": "MongoProcessedSinkRecordData.java",
    "line_number": "109",
    "thread_name": "task-thread-dev-rtff-cdc-sink-1",
    "@version": 1,
    "logger_name": "com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData",
    "class": "com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData"
}

When I run the configuration without the key and value projection, it works fine. Has anyone had any success running this combination of processor chain with the transform?

Hi! I was experiencing a very similar problem when configuring the processor chain with the transform.
In the end, the problem was on my “value.projection.list”.
The DefaultWriteModelStrategy, specifically the ReplaceOneDefaultStrategy for some reason needs the _id field to also be present on the value of the record. By adding “_id” to my value.projection.list my problem was solved.

Hope this helps!

1 Like

Thank you very much Luisa_Emme for your reply on this topic.
I’ve had quite a hard time trying to figure out what was wrong with my config file, since I couldn’t get any records to be sinked to the corresponding collection. It turns out that I was debugging the wrong file – the “problem” was in the WriteModelStrategy class, in this case, UpdateOneTimestampsStrategy.

Although it makes total sense to leave the “_id” field in the document and, thus, in the “value.projection.list” (because some strategies depend on it for insertion/updating), I really think a proper documentation is lacking in this case, especially when such value comes from the Record key.

In any case, thanks again!