Source Connector - Connectivity issues

Hi,

I’m experimenting with the official Mongo Kafka Source Connector. The Kafka cluster runs in AWS MSK, and so does the Connector. Mongo Cluster runs in Mongo Atlas version 6.06 and the connector mongo-kafka-connect-1.10.1-all.jar.

Strange thing is that It has successfully connected a few times… But mostly it gets error (see logs below)

Here is the connector cofig:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=mongodb+srv://mytestuser:mypassword@myserver.domain.net/?connectTimeoutMS=30000
startup.mode=latest
tasks.max=1
collection=MyCollection
database=MyDatabase
publish.full.document.only=true
output.schema.key={"type":"record","name":"keySchema","fields":[{"name":"id","type":"string"}]}
output.format.key=schema

From logs in CloudWatch:


2023-06-19T13:04:44.000+02:00	[Worker-<XXX>] [2023-06-19 11:04:44,041] INFO Adding discovered server <XXX>:27017 to client view of cluster (org.mongodb.driver.cluster:71)

2023-06-19T13:04:44.000+02:00	[Worker-<XXX>] [2023-06-19 11:04:44,045] INFO Adding discovered server <XXX>:27017 to client view of cluster (org.mongodb.driver.cluster:71)

2023-06-19T13:04:44.000+02:00	[Worker-<XXX>] [2023-06-19 11:04:44,051] INFO Adding discovered server <XXX>:27017 to client view of cluster (org.mongodb.driver.cluster:71)

2023-06-19T13:04:48.000+02:00	[Worker-<XXX>] [2023-06-19 11:04:48,293] INFO Exception in monitor thread while connecting to server <XXX>:27017 (org.mongodb.driver.cluster:76)

2023-06-19T13:04:48.000+02:00	[Worker-<XXX>] com.mongodb.MongoSocketOpenException: Exception opening socket

2023-06-19T13:04:48.000+02:00	[Worker-<XXX>] at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)

2023-06-19T13:04:48.000+02:00	[Worker-<XXX>] at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:180)

2023-06-19T13:04:48.000+02:00	[Worker-<XXX>] at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:193)

2023-06-19T13:04:48.000+02:00	[Worker-<XXX>] at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:157)

2023-06-19T13:04:48.000+02:00	[Worker-<XXX>] at java.base/java.lang.Thread.run(Thread.java:829)

2023-06-19T13:04:48.000+02:00	[Worker-<XXX>] Caused by: java.net.SocketTimeoutException: connect timed out

Make sure you connect over Private Link to MongoDB from MSK. See the steps in the blog:

Thanks. Tried this now and connected with the new private endpoint connection string from MSK Connect. It’s still not able to connect to the Atlas cluster though.

Logs:

2023-06-20T08:47:11.000+02:00	[Worker-XXX] [2023-06-20 06:47:11,487] INFO Adding discovered server pl-0-eu-west-1.XXX.mongodb.net:1025 to client view of cluster (org.mongodb.driver.cluster:71)

2023-06-20T08:47:11.000+02:00	[Worker-XXX] [2023-06-20 06:47:11,488] INFO Adding discovered server pl-0-eu-west-1.XXX.mongodb.net:1024 to client view of cluster (org.mongodb.driver.cluster:71)

2023-06-20T08:47:11.000+02:00	[Worker-XXX] [2023-06-20 06:47:11,491] INFO Adding discovered server pl-0-eu-west-1.XXX.mongodb.net:1026 to client view of cluster (org.mongodb.driver.cluster:71)

2023-06-20T08:47:26.000+02:00	[Worker-XXX] [2023-06-20 06:47:26,985] INFO AbstractConfig values:

2023-06-20T08:47:26.000+02:00	[Worker-XXX] (org.apache.kafka.common.config.AbstractConfig:361)

:thinking: :thinking:

Ok - the security group for the vpc endpoint interface was missing inbound rules. So now it works.

Would like to get it working with VPC Peering though.

Oh, great. I was about to suggest reachability analyzer. :slight_smile:

Any particular reasons for VPC peering? PL should be the first choice normally.

Just that it has a lower cost :slight_smile:

Hello everyone ! I am also facing the same issue i am using aws msk conenctor to create a source for mongodb atlas(Free version), and i have set 0.0.0.0/0 on network access on mongodb atlas, When i am creating msk connector or AWS Flink application to capture or send data to mongo i am getting timeout error, My vpc and sg allows all the access from internet, here are the configuration i have used for creating the connector.
connector.class=io.debezium.connector.mongodb.MongoDbConnector
collection.include.list=mongo-***
mongodb.password=pass
tasks.max=1
database.history.kafka.bootstrap.servers=
database.history.kafka.topic=dbhistory.fullfillment
mongodb.user=username
connection.attempts=1
mongodb.name=mongo-source
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
mongodb.hosts=mongodb+srv://:@host.net/?retryWrites=true&w=majority&appName=myapp
database.include.list=mongo-db
I also have a qus while trying to make connection for aws managed service like (flink and msk connector) do we need private endpoint or vpc peering for mongo or we can make connection just by allowing the access in vpc and sg attached to the services(AWS Flink and MSK connector)?