Browsing the Spark Connector Change Stream Configuration Docs and the source code on Github, I’ve been unable to figure out how to specify a resumeAfter/startAfter token when consuming a Mongo db or collection as a readStream the way I would using a Python client like Motor.

Resuming consumption from a particular offset is a hard requirement for our use of the Spark Connector as we cannot guarantee 100% consumer uptime, yet need to be able to propagate 100% of the change feed to our sinks.

Is resumeAfter/startAfter supported and I’m just missing the documentation? And if not, would it be possible to support this as a read configuration option?

3 Likes

I am unable to find this option in the documentation too.
@Robert_Walters Could you please confirm if this feature is available in version 10.0?
Thanks in Advance.

Currently it is not possible, I added https://jira.mongodb.org/browse/SPARK-380

Can you add your use case to that ticket? If you don’t have a jira account, can you elaborate on what you expect to provide as a resume value? epoch time or Timestamp value ?

Is it possible right now to pass in the resume token to the spark connector?

@Robert_Walters I have been unable to locate the documentation for passing resume token to Spark connector.

Today it is not possible to pass the resume token. We created https://jira.mongodb.org/browse/SPARK-380 to add this functionality

Was this feature ever implemented? I see the ticket @Robert_Walters created was closed along with the linked tickets but Robert’s ticket was never assigned.

@Prakul_Agarwal now owns the Spark Connector and can comment on this specifically. I am not sure where the priority of this ended up.

We can use checkpointing while writing write streams. From the offset, the stream will be resumed where it left off.

Ex:
query = streaming_df.writeStream.format(“parquet”)\

.option(“checkpointLocation”, checkpoint_location) \

.foreachBatch(writeData)\

.trigger(once=True)\

.start()\

.awaitTermination()

“sources” : [ {
“description” : “com.mongodb.spark.sql.connector.read.MongoMicroBatchStream@6793d7d6”,
“startOffset” : {
“version” : 1,
“offset” : {
“$timestamp” : {
“t” : 1713179033,
“i” : 0
}
}
},
“endOffset” : {
“version” : 1,
“offset” : {
“$timestamp” : {
“t” : 1713179298,
“i” : 0
}
}
},
“latestOffset” : {
“version” : 1,
“offset” : {
“$timestamp” : {
“t” : 1713179298,
“i” : 0
}
}
},

Please refer to startOffset - which is the the offset from which stream starts.