3 / 3
May 2024

Hi Team, I am trying to do CDC on MongoDB using the Dbezium Flink Connector
Below is the code where I am creating source to read cdc events across database.

SourceFunction<String> mongoSource = MongoDBSource.<String>builder() .hosts("host") .scheme("mongodb") .username("user") .password("password") .databaseList("test") .copyExisting(true) .deserializer(new JsonDebeziumDeserializationSchema()) .build();

But I facing issues while connecting to mongodb where as I am able to connect with mongoDb from local machine, compaas as well spring boot applications.
Not able to understand what is going wrong here? Is it because DNS resolution issue or there is something we need to at Atlas Side.
Below is the error

Caused by: com.mongodb.MongoConfigurationException: A TXT record is only permitted to contain the keys [authsource, replicaset], but the TXT record for '{hostUrl}' contains the keys [loadbalanced, authsource]
com.mongodb.MongoSocketOpenException: Exception opening socket at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70) at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:128) at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:117) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.net.SocketTimeoutException: connect timed out

Are you connecting to a sharded cluster?

Are you using the Debezium Connector for MongoDB or the MongoDB Connector for Apache Kafka?

Where did you obtain the Atlas Connection string from? Are you using the connection string provided within the Atlas UI?

17 days later

Hi @Atharv_Joshi,

I use flink cdc 3.1.0 to run the following code, and it works well.

// mongodb+srv://cluster0.xxxxx.mongodb.net/ SourceFunction<String> sourceFunction = MongoDBSource.<String>builder() .hosts("cluster0.xxxx.mongodb.net") .scheme("mongodb+srv") .username("root") .password("root") .databaseList("test") .collectionList("test.test_coll") .deserializer(new JsonDebeziumDeserializationSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(sourceFunction) .print() .setParallelism(1); env.execute("Print MongoDB Snapshot + Change Stream Events");

The appearance of this issue seems to be due to a version conflict in the org.mongodb:mongodb-driver-sync, which does not support loadbalance mode.
Please check if the version of the mongodb-driver-sync in the pom file is greater than 4.3.0.

By the way, if using SRV connection format, the ‘scheme’ option needs to be set to ‘mongodb+srv’.

Best,
Jiabao