Docs Menu
Docs Home
/ / /
Kotlin Sync Driver
/

Monitor Data Changes

On this page

  • Overview
  • Sample Data
  • Open a Change Stream
  • Open a Change Stream Example
  • Modify the Change Stream Output
  • Match Specific events Example
  • Modify watch() Behavior
  • Include Pre-Images and Post-Images
  • Additional Information
  • API Documentation

In this guide, you can learn how to use the Kotlin Sync driver to monitor a change stream, allowing you to view real-time changes to your database. A change stream is a MongoDB Server feature that publishes data changes on a collection, database, or deployment. Your application can subscribe to a change stream and use events to perform other actions.

The examples in this guide use the restaurants collection in the sample_restaurants database from the Atlas sample datasets. To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with Atlas guide.

The following Kotlin data class models the documents in this collection:

data class Restaurant(
val name: String,
val cuisine: String,
)

To open a change stream, call the watch() method. The instance on which you call the watch() method on determines the scope of events that the change stream listens for. You can call the watch() method on instances of the following classes:

  • MongoClient: To monitor all changes in the MongoDB deployment

  • MongoDatabase: To monitor changes in all collections in the database

  • MongoCollection: To monitor changes in the collection

The following example opens a change stream on the restaurants collection and prints changes as they occur:

collection.watch().forEach { change ->
println(change)
}

To begin watching for changes, run the application. Then, in a separate application or shell, perform a write operation on the restaurants collection. The following example updates a document in which the value of the name is "Blarney Castle":

val filter = Filters.eq(Restaurant::name.name, "Blarney Castle")
val update = Updates.set(Restaurant::cuisine.name, "Irish")
val result = collection.updateOne(filter, update)

When you update the collection, the change stream application prints the change as it occurs. The printed change event resembles the following:

{
"_id": { ... },
"operationType": "update",
"clusterTime": { ... },
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
...
}

You can pass the pipeline parameter to the watch() method to modify the change stream output. This parameter allows you to watch for only specified change events. Format the parameter as a list of objects that each represents an aggregation stage.

You can specify the following stages in the pipeline parameter:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

The following example uses the pipeline parameter that includes a $match to open a change stream that only records update operations:

val pipeline = listOf(
Aggregates.match(Filters.eq("operationType", "update"))
)
collection.watch(pipeline).forEach { change ->
println(change)
}

To learn more about modifying your change stream output, see the Modify Change Stream Output section in the MongoDB Server manual.

You can modify the watch() by chaining methods to the ChangeStreamIterable object returned by the watch() method call. If you don't specify any options, the driver does not customize the operation.

The following table describes methods you can use to customize the behavior of watch():

Method
Description
batchSize()
Sets the number of documents to return per batch.
collation()
Specifies the kind of language collation to use when sorting results. For more information, see Collation in the MongoDB Server manual.
comment()
Specifies a comment to attach to the operation.
fullDocument()
Sets the fullDocument value. To learn more, see the Include Pre-Images and Post-Images section of this document.
fullDocumentBeforeChange()
Sets the fullDocumentBeforeChange value. To learn more, see the Include Pre-Images and Post-Images section of this document.
maxAwaitTime()
Sets the maximum await execution time on the server for this operation, in milliseconds.

For a complete list of methods you can use to configure the watch() method, see the ChangeStreamIterable API documentation.

Important

You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.

By default, when you perform an operation on a collection, the corresponding change event includes only the delta of the fields modified by that operation. To see the full document before or after a change, chain the fullDocumentBeforeChange() or the fullDocument() methods to the watch() method.

The pre-image is the full version of a document before a change. To include the pre-image in the change stream event, pass one of the following options to the fullDocumentBeforeChange() method:

  • FullDocumentBeforeChange.WHEN_AVAILABLE: The change event includes a pre-image of the modified document for change events only if the pre-image is available.

  • FullDocumentBeforeChange.REQUIRED: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the driver raises an error.

The post-image is the full version of a document after a change. To include the post-image in the change stream event, pass one of the following options to the fullDocument() method:

  • FullDocument.UPDATE_LOOKUP: The change event includes a copy of the entire changed document from some time after the change.

  • FullDocument.WHEN_AVAILABLE: The change event includes a post-image of the modified document for change events only if the post-image is available.

  • FullDocument.REQUIRED: The change event includes a post-image of the modified document for change events. If the post-image is not available, the driver raises an error.

The following example calls the watch() method on a collection and includes the post-image of updated documents in the results by specifying the fullDocument parameter:

collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change ->
println("Received a change: $change")
}

With the change stream application running, updating a document in the restaurants collection by using the preceding update example prints a change event resembling the following:

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish),
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}},
clusterTime=Timestamp{value=..., seconds=..., inc=...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null,
wallTime=BsonDateTime{value=...}}

To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.

To learn more about change streams, see Change Streams in the MongoDB Server manual.

To learn more about any of the methods or types discussed in this guide, see the following API documentation:

Back

Data Cursors