Monitor Data Changes
On this page
Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.
When using the Scala driver, you can call the watch()
method to return an
instance of ChangeStreamObservable
. Then, you can subscribe to the
ChangeStreamObservable
instance to see new data changes, such as updates,
insertions, and deletions.
Sample Data
The examples in this guide use the restaurants
collection in the sample_restaurants
database from the Atlas sample datasets. To access this collection
from your Scala application, create a MongoClient
that connects to an Atlas cluster
and assign the following values to your database
and collection
variables:
val database: MongoDatabase = client.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
Tip
To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with Atlas guide.
Some examples use instances of the LatchedObserver
class to handle change
stream events. This class is a custom observer that prints change stream events
and continues monitoring data changes until the stream completes or generates
an error. To use the LatchedObserver
class, paste the following code into
your application file:
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() }
Open a Change Stream
To open a change stream, call the watch()
method. The instance on which you
call the watch()
method determines the scope of events that the change
stream monitors. You can call the watch()
method on instances of the following
classes:
MongoClient
: Monitors changes to all collections across all databases in a deployment, excluding system collections or collections in theadmin
,local
, andconfig
databasesMongoDatabase
: Monitors changes to all collections in one databaseMongoCollection
: Monitors changes to one collection
The following example calls the watch()
method to open a change stream on the
restaurants
collection. The code creates a LatchedObserver
instance to
receive and output changes as they occur:
val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await()
To begin watching for changes, run the preceding code. Then, in a separate
shell, run the following code to update a document that has a name
field
value of "Blarney Castle"
:
val filter = equal("name", "Blarney Castle") val update = set("cuisine", "American") collection.updateOne(filter, update) .subscribe((res: UpdateResult) => println(res), (e: Throwable) => println(s"There was an error: $e"))
When you run the preceding code to update the collection, the change stream application prints the change as it occurs. The printed change event resembles the following output:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
Modify the Change Stream Output
To modify the change stream output, you can pass a list of pipeline stages as a
parameter to the watch()
method. You can include the following stages in the
list:
$addFields
or$set
: Adds new fields to documents$match
: Filters the documents$project
: Projects a subset of the document fields$replaceWith
or$replaceRoot
: Replaces the input document with the specified document$redact
: Restricts the contents of the documents$unset
: Removes fields from documents
The Scala driver provides the Aggregates
class, which includes helper methods
for building the preceding pipeline stages.
Tip
To learn more about pipeline stages and their corresponding Aggregates
helper methods, see the following resources:
Aggregation Stages in the MongoDB Server manual
Aggregates in the API documentation
The following example creates a pipeline that uses the Aggregates.filter()
method
to build the $match
stage. Then, the code passes this pipeline to the watch()
method and instructs watch()
to output events only when update operations occur:
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update")))) observer.await()
Modify watch() Behavior
You can modify the behavior of the watch()
method by chaining
methods provided by the ChangeStreamObservable
class. The following
table describes some of these methods:
Method | Description |
---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the Include Pre-Images and Post-Images section of this
guide. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Attaches a comment to the operation. |
| Instructs the change stream to provide only changes that occurred at or after
the specified timestamp. |
| Sets the collation to use for the change stream cursor. |
For a full list of watch()
options, see ChangeStreamObservable in the API
documentation.
Include Pre-Images and Post-Images
Important
You can enable pre-images and post-images on collections only if your deployment uses MongoDB Server v6.0 or later.
By default, when you perform an operation on a collection, the corresponding change event includes only the modified fields and their values before and after the operation.
You can instruct the watch()
method to return the document's pre-image, the
full version of the document before changes, in addition to the modified fields. To
include the pre-image in the change stream event, chain the fullDocumentBeforeChange()
method to watch()
. Pass one of the following values to the fullDocumentBeforeChange()
method:
FullDocumentBeforeChange.WHEN_AVAILABLE
: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, this change event field has anull
value.FullDocumentBeforeChange.REQUIRED
: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the server raises an error.
You can also instruct the watch()
method to return the document's post-image,
the full version of the document after changes, in addition to the modified fields.
To include the post-image in the change stream event, chain the fullDocument()
method to watch()
. Pass one of the following values to the fullDocument()
method:
FullDocument.UPDATE_LOOKUP
: The change event includes a copy of the entire changed document from some time after the change.FullDocument.WHEN_AVAILABLE
: The change event includes a post-image of the modified document for change events. If the post-image is not available, this change event field has anull
value.FullDocument.REQUIRED
: The change event includes a post-image of the modified document for change events. If the post-image is not available, the server raises an error.
The following example calls the watch()
method on a collection and includes the post-image
of updated documents by chaining the fullDocument()
method:
val observer = LatchedObserver() collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) .subscribe(observer) observer.await()
With the change stream application running in a separate shell, updating a
document in the restaurants
collection by using the preceding update
example prints a change event that resembles the following
output:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24", "coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard", "zipcode": "11697"}), (borough,BsonString{value='Queens'}), (cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}), (name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}), (blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription= UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
Tip
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
Additional Information
To learn more about change streams, see Change Streams in the MongoDB Server manual.
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: