Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

MongoDB Developer
Go
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Languageschevron-right
Gochevron-right

Reacting to Database Changes with MongoDB Change Streams and Go

Nic Raboy5 min read • Published Feb 01, 2022 • Updated Feb 03, 2023
MongoDBChange StreamsGo
Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
QuickStart Golang Logo
If you've been keeping up with my getting started with Go and MongoDB tutorial series, you'll remember that we've accomplished quite a bit so far. We've had a look at everything from CRUD interaction with the database to data modeling, and more. To play catch up with everything we've done, you can have a look at the following tutorials in the series:
In this tutorial we're going to explore change streams in MongoDB and how they might be useful, all with the Go programming language (Golang).
Before we take a look at the code, let's take a step back and understand what change streams are and why there's often a need for them.
Imagine this scenario, one of many possible:
You have an application that engages with internet of things (IoT) clients. Let's say that this is a geofencing application and the IoT clients are something that can trigger the geofence as they come in and out of range. Rather than having your application constantly run queries to see if the clients are in range, wouldn't it make more sense to watch in real-time and react when it happens?
With MongoDB change streams, you can create a pipeline to watch for changes on a collection level, database level, or deployment level, and write logic within your application to do something as data comes in based on your pipeline.

Creating a Real-Time MongoDB Change Stream with Golang

While there are many possible use-cases for change streams, we're going to continue with the example that we've been using throughout the scope of this getting started series. We're going to continue working with podcast show and podcast episode data.
Let's assume we have the following code to start:
1package main
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "sync"
8
9 "go.mongodb.org/mongo-driver/bson"
10 "go.mongodb.org/mongo-driver/mongo"
11 "go.mongodb.org/mongo-driver/mongo/options"
12)
13
14func main() {
15 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI")))
16 if err != nil {
17 panic(err)
18 }
19 defer client.Disconnect(context.TODO())
20
21 database := client.Database("quickstart")
22 episodesCollection := database.Collection("episodes")
23}
The above code is a very basic connection to a MongoDB cluster, something that we explored in the How to Get Connected to Your MongoDB Cluster with Go, tutorial.
To watch for changes, we can do something like the following:
1episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
2if err != nil {
3 panic(err)
4}
The above code will watch for any and all changes to documents within the episodes collection. The result is a cursor that we can iterate over indefinitely for data as it comes in.
We can iterate over the curser and make sense of our data using the following code:
1episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
2if err != nil {
3 panic(err)
4}
5
6defer episodesStream.Close(context.TODO())
7
8for episodesStream.Next(context.TODO()) {
9 var data bson.M
10 if err := episodesStream.Decode(&data); err != nil {
11 panic(err)
12 }
13 fmt.Printf("%v\n", data)
14}
If data were to come in, it might look something like the following:
1map[_id:map[_data:825E4EFCB9000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E3B38511C9D4400004117E80004] clusterTime:{1582234809 1} documentKey:map[_id:ObjectID("5e3b38511c9d
24400004117e8")] fullDocument:map[_id:ObjectID("5e3b38511c9d4400004117e8") description:The second episode duration:30 podcast:ObjectID("5e3b37e51c9d4400004117e6") title:Episode #3] ns:map[coll:episodes
3db:quickstart] operationType:replace]
In the above example, I've done a Replace on a particular document in the collection. In addition to information about the data, I also receive the full document that includes the change. The results will vary depending on the operationType that takes place.
While the code that we used would work fine, it is currently a blocking operation. If we wanted to watch for changes and continue to do other things, we'd want to use a goroutine for iterating over our change stream cursor.
We could make some changes like this:
1package main
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "sync"
8
9 "go.mongodb.org/mongo-driver/bson"
10 "go.mongodb.org/mongo-driver/mongo"
11 "go.mongodb.org/mongo-driver/mongo/options"
12)
13
14func iterateChangeStream(routineCtx context.Context, waitGroup sync.WaitGroup, stream *mongo.ChangeStream) {
15 defer stream.Close(routineCtx)
16 defer waitGroup.Done()
17 for stream.Next(routineCtx) {
18 var data bson.M
19 if err := stream.Decode(&data); err != nil {
20 panic(err)
21 }
22 fmt.Printf("%v\n", data)
23 }
24}
25
26func main() {
27 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI")))
28 if err != nil {
29 panic(err)
30 }
31 defer client.Disconnect(context.TODO())
32
33 database := client.Database("quickstart")
34 episodesCollection := database.Collection("episodes")
35
36 var waitGroup sync.WaitGroup
37
38 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
39 if err != nil {
40 panic(err)
41 }
42 waitGroup.Add(1)
43 routineCtx, cancelFn := context.WithCancel(context.Background())
44 go iterateChangeStream(routineCtx, waitGroup, episodesStream)
45
46 waitGroup.Wait()
47}
A few things are happening in the above code. We've moved the stream iteration into a separate function to be used in a goroutine. However, running the application would result in it terminating quite quickly because the main function will terminate not too longer after creating the goroutine. To resolve this, we are making use of a WaitGroup. In our example, the main function will wait until the WaitGroup is empty and the WaitGroup only becomes empty when the goroutine terminates.
Making use of the WaitGroup isn't an absolute requirement as there are other ways to keep the application running while watching for changes. However, given the simplicity of this example, it made sense in order to see any changes in the stream.
To keep the iterateChangeStream function from running indefinitely, we are creating and passing a context that can be canceled. While we don't demonstrate canceling the function, at least we know it can be done.

Complicating the Change Stream with the Aggregation Pipeline

In the previous example, the aggregation pipeline that we used was as basic as you can get. In other words, we were looking for any and all changes that were happening to our particular collection. While this might be good in a lot of scenarios, you'll probably get more out of using a better defined aggregation pipeline.
Take the following for example:
1matchPipeline := bson.D{
2 {
3 "$match", bson.D{
4 {"operationType", "insert"},
5 {"fullDocument.duration", bson.D{
6 {"$gt", 30},
7 }},
8 },
9 },
10}
11
12episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{matchPipeline})
In the above example, we're still watching for changes to the episodes collection. However, this time we're only watching for new documents that have a duration field greater than 30. Any other insert or other change stream operation won't be detected.
The results of the above code, when a match is found, might look like the following:
1map[_id:map[_data:825E4F03CF000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E4F03A01C9D44000063CCBD0004] clusterTime:{1582236623 1} documentKey:map[_id:ObjectID("5e4f03a01c9d
244000063ccbd")] fullDocument:map[_id:ObjectID("5e4f03a01c9d44000063ccbd") description:a quick start into mongodb duration:35 podcast:1234 title:getting started with mongodb] ns:map[coll:episodes db:qui
3ckstart] operationType:insert]
With change streams, you'll have access to a subset of the MongoDB aggregation pipeline and its operators. You can learn more about what's available in the official documentation.

Conclusion

You just saw how to use MongoDB change streams in a Golang application using the MongoDB Go driver. As previously pointed out, change streams make it very easy to react to database, collection, and deployment changes without having to constantly query the cluster. This allows you to efficiently plan out aggregation pipelines to respond to as they happen in real-time.
If you're looking to catch up on the other tutorials in the MongoDB with Go quick start series, you can find them below:
To bring the series to a close, the next tutorial will focus on transactions with the MongoDB Go driver.

Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Go to MongoDB Using Kafka Connectors - Ultimate Agent Guide


Sep 17, 2024 | 7 min read
Tutorial

HTTP Servers Persisting Data in MongoDB


Sep 04, 2024 | 5 min read
Tutorial

HTTP basics With Go 1.22


Apr 23, 2024 | 7 min read
Tutorial

Concurrency and Gracefully Closing the MDB Client


Sep 04, 2024 | 5 min read
Table of Contents