Monitor Data Changes
On this page
Overview
In this guide, you can learn how to monitor document changes by using a change stream.
A change stream outputs new change events, providing access to real-time data changes. You can open a change stream on a collection, database, or client object.
Sample Data
The examples in this guide use the following Course
struct as a model for documents
in the courses
collection:
type Course struct { Title string Enrollment int32 }
To run the examples in this guide, load these documents into the
courses
collection in the db
database by using the following snippet:
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
Tip
Nonexistent Databases and Collections
If the necessary database and collection don't exist when you perform a write operation, the server implicitly creates them.
Each document contains a description of a university course that
includes the course title and maximum enrollment, corresponding to
the title
and enrollment
fields in each document.
Note
Each example output shows truncated _data
, clusterTime
, and
ObjectID
values because the driver generates them uniquely.
Open a Change Stream
To open a change stream, use the Watch()
method. The Watch()
method requires a context
parameter and a pipeline parameter. To return all changes, pass in an
empty Pipeline
object.
Example
The following example opens a change stream on the courses
collection and
outputs all changes:
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
If you modify the courses
collection in a separate program or shell, this code prints
your changes as they occur. Inserting a document with a title
value
of "Advanced Screenwriting"
and an enrollment
value of 20
results in the following change event:
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
Modify the Change Stream Output
Use the pipeline parameter to modify the change stream output. This parameter allows you to only watch for certain change events. Format the pipeline parameter as an array of documents, with each document representing an aggregation stage.
You can use the following pipeline stages in this parameter:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
Example
The following example opens a change stream on the db
database but only watches for
new delete operations:
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Note
The Watch()
method was called on the db
database, so the code outputs
new delete operations on any collection within this database.
Modify the Behavior of Watch()
Use the options
parameter to modify the behavior of the Watch()
method.
You can specify the following options for the Watch()
method:
ResumeAfter
StartAfter
FullDocument
FullDocumentBeforeChange
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
Comment
ShowExpandedEvents
StartAtOperationTime
Custom
CustomPipeline
For more information on these options, visit the MongoDB Server manual.
Pre- and Post-Images
When you perform any CRUD operation on a collection, by default, the
corresponding change event document contains only the delta of the fields modified
by the operation. You can see the full document before and after a
change, in addition to the delta, by specifying settings in the options
parameter of the Watch()
method.
If you want to see a document's post-image, the full version of the
document after a change, set the FullDocument
field of the
options
parameter to one of the following values:
UpdateLookup
: The change event document includes a copy of the entire changed document.WhenAvailable
: The change event document includes a post-image of the modified document for change events if the post-image is available.Required
: The output is the same as forWhenAvailable
, but the driver raises a server-side error if the post-image is not available.
If you want to see a document's pre-image, the full version of the
document before a change, set the FullDocumentBeforeChange
field of the
options
parameter to one of the following values:
WhenAvailable
: The change event document includes a pre-image of the modified document for change events if the pre-image is available.Required
: The output is the same as forWhenAvailable
, but the driver raises a server-side error if the pre-image is not available.
Important
To access document pre- and post-images, you must enable
changeStreamPreAndPostImages
for the collection. See the
MongoDB Server manual for instructions and more
information.
Note
There is no pre-image for an inserted document and no post-image for a deleted document.
Example
The following example calls the Watch()
method on the courses
collection. It
specifies a value for the FullDocument
field of the options
parameter to
output a copy of the entire modified document, instead of only the changed fields:
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Updating the enrollment
value of the document with the
title
of "World Fiction"
from 35
to 30
results in the
following change event:
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
Without specifying the FullDocument
option, the same update operation no longer
outputs the "fullDocument"
value in the change event document.
Additional Information
For a runnable example of a change stream, see Monitor Data Changes.
For more information on change streams, see Change Streams.
API Documentation
To learn more about the Watch()
method, visit the following API documentation links: