Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Python
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Languageschevron-right
Pythonchevron-right

MongoDB Change Streams with Python

Naomi Pentrel9 min read • Published Feb 05, 2022 • Updated Sep 23, 2022
MongoDBChange StreamsPython
Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty

Introduction

QuickStart Python Logo
Change streams allow you to listen to changes that occur in your MongoDB database. On MongoDB 3.6 or above, this functionality allows you to build applications that can immediately respond to real time data changes. In this tutorial, we'll show you how to use change streams with Python. In particular you will:
  • Learn about change streams
  • Create a program that listens to inserts
  • Change the program to listen to other event types
  • Change the program to listen to specific notifications
To follow along, you can create a test environment using the steps below. This is optional but highly encouraged as it will allow you to test usage of the change stream functionality with the examples provided. You will be given all commands, but some familiarity with MongoDB is needed.

Learn about Change Streams

The ability to listen to specific changes in the data allows an application to be much faster in responding to change. If a user of your system updates their information, the system can listen to and propagate these changes right away. For example, this could mean users no longer have to click refresh to see when changes have been applied. Or if a user's changes in one system need approval by someone, another system could listen to changes and send notifications requesting approvals instantaneously.
Before change streams, applications that needed to know about the addition of new data in real-time had to continuously poll data or rely on other update mechanisms. One common, if complex, technique for monitoring changes was tailing MongoDB's Operation Log (Oplog). The Oplog is part of the replication system of MongoDB and as such already tracks modifications to the database but is not easy to use for business logic. Change streams are built on top of the Oplog but they provide a native API that improves efficiency and usability. Note that you cannot open a change stream against a collection in a standalone MongoDB server because the feature relies on the Oplog which is only used on replica sets.
When registering a change stream you need to specify the collection and what types of changes you want to listen to. You can do this by using the $match and a few other aggregation pipeline stages which limit the amount of data you will receive. If your database enforces authentication and authorization, change streams provide the same access control as for normal queries.

Test the Change Stream Features

The best way to understand how change streams operate is to work with them. In the next section, we'll show you how to set up a server and scripts. After completing the setup, you will get two scripts: One Python script will listen to notifications from the change stream and print them. The other script will mimic an application by performing insert, update, replace, and delete operations so that you can see the notifications in the output of the first script. You will also learn how to limit the notifications to the ones you are interested in.

Set up PyMongo

To get started, set up a virtual environment using Virtualenv. Virtualenv allows you to isolate dependencies of your project from other projects. Create a directory for this project and copy the following into a file called requirements.txt in your new directory:
1pymongo==3.8.0
2dnspython
To create and activate your virtual environment, run the following commands in your terminal:
1virtualenv venv # sets up the environment
2source venv/bin/activate # activates the environment
3pip3 install -r requirements.txt # installs our dependencies
For ease of reading, we assume you are running Python 3 with the python3 and pip3 commands. If you are running Python 2.7, substitute python and pip for those commands.

Set up your Cluster

We will go through two options for setting up a test MongoDB Replica Set for us to connect to. If you have MongoDB 3.6 or later installed and are comfortable making changes to your local setup choose this option and follow the guide in the appendix and skip to the next section.
If you do not have MongoDB installed, would prefer not to mess with your local setup or if you are fairly new to MongoDB then we recommend that you set up a MongoDB Atlas cluster; there's a free tier which gives you a three node replica set which is ideal for experimenting and learning with. Simply follow these steps until you get the URI connection string in step 8. Take that URI connection string, insert the password where it says <password>, and add it to your environment by running
1export CHANGE_STREAM_DB="mongodb+srv://user:<password>@example-xkfzv.mongodb.net/test?retryWrites=true"
in your terminal. The string you use as a value will be different.

Listen to Inserts from an Application

Before continuing, quickly test your setup. Create a file test.py with the following contents:
1import os
2import pymongo
3
4client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB'])
5print(client.changestream.collection.insert_one({"hello": "world"}).inserted_id)
When you run python3 test.py you should see an ObjectId being printed.
Now that you've confirmed your setup, let's create the small program that will listen to changes in the database using a change stream. Create a different file change_streams.py with the following content:
1import os
2import pymongo
3from bson.json_util import dumps
4
5client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB'])
6change_stream = client.changestream.collection.watch()
7for change in change_stream:
8 print(dumps(change))
9 print('') # for readability only
Go ahead and run python3 change_streams.py, you will notice that the program doesn't print anything and just waits for operations to happen on the specified collection. While keeping the change_streams program running, open up another terminal window and run python3 test.py. You will have to run the same export command you ran in the Set up your Cluster section to add the environment variable to the new terminal window.
Checking the terminal window that is running the change_streams program, you will see that the insert operation was logged. It should look like the output below but with a different ObjectId and with a different value for $binary.
1➜ python3 change_streams.py
2{"_id": {"_data": {"$binary": "glsIjGUAAAABRmRfaWQAZFsIjGXiJuWPOIv2PgBaEAQIaEd7r8VFkazelcuRgfgeBA==", "$type": "00"}}, "operationType": "insert", "fullDocument": {"_id": {"$oid": "5b088c65e226e58f388bf63e"}, "hello": "world"}, "ns": {"db": "changestream", "coll": "collection"}, "documentKey": {"_id": {"$oid": "5b088c65e226e58f388bf63e"}}}

Listen to Different Event Types

You can listen to four types of document-based events:
  • Insert
  • Update
  • Replace
  • Delete
Depending on the type of event the document structure you will receive will differ slightly but you will always receive the following:
1{
2 _id: <resume_token>,
3 operationType: "<type>",
4 ns: {db: "<db name>", coll: "<collection name>"},
5 documentKey: { <unique identifier> }
6}
In the case of inserts and replace operations the fullDocument is provided by default as well. In the case of update operations the extra field provided is updateDescription and it gives you the document delta (i.e. the difference between the document before and after the operation). By default update operations only include the delta between the document before and after the operation. To get the full document with each update you can pass in "updateLookup" to the full document option. If an update operation ends up changing multiple documents, there will be one notification for each updated document. This transformation occurs to ensure that statements in the oplog are idempotent.
There is one further type of event that can be received which is the invalidate event. This tells the driver that the change stream is no longer valid. The driver will then close the stream. Potential reasons for this include the collection being dropped or renamed.
To see this in action update your test.py and run it while also running the change_stream program:
1import os
2import pymongo
3
4client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB'])
5client.changestream.collection.insert_one({"_id": 1, "hello": "world"})
6client.changestream.collection.update_one({"_id": 1}, {"$set": {"hello": "mars"}})
7client.changestream.collection.replace_one({"_id": 1} , {"bye": "world"})
8client.changestream.collection.delete_one({"_id": 1})
9client.changestream.collection.drop()
The output should be similar to:
1➜ python3 change_streams.py
2{"fullDocument": {"_id": 1, "hello": "world"}, "documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAABRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "insert"}
3
4{"documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAACRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "updateDescription": {"removedFields": [], "updatedFields": {"hello": "mars"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "update"}
5
6{"fullDocument": {"bye": "world", "_id": 1}, "documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAADRh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "replace"}
7
8{"documentKey": {"_id": 1}, "_id": {"_data": {"$binary": "glsIjuEAAAAERh5faWQAKwIAWhAECGhHe6/FRZGs3pXLkYH4HgQ=", "$type": "00"}}, "ns": {"coll": "collection", "db": "changestream"}, "operationType": "delete"}
9
10{"_id": {"_data": {"$binary": "glsIjuEAAAAFFFoQBAhoR3uvxUWRrN6Vy5GB+B4E", "$type": "00"}}, "operationType": "invalidate"}

Listen to Specific Notifications

So far, your program has been listening to all operations. In a real application this would be overwhelming and often unnecessary as each part of your application will generally want to listen only to specific operations. To limit the amount of operations, you can use certain aggregation stages when setting up the stream. These stages are: $match, $project, $addfields, $replaceRoot, and $redact. All other aggregation stages are not available.
You can test this functionality by changing your change_stream.py file with the code below and running the test.py script. The output should now only contain insert notifications.
1import os
2import pymongo
3from bson.json_util import dumps
4
5client = pymongo.MongoClient(os.environ['CHANGE_STREAM_DB'])
6change_stream = client.changestream.collection.watch([{
7 '$match': {
8 'operationType': { '$in': ['insert'] }
9 }
10}])
11
12for change in change_stream:
13 print(dumps(change))
14 print('')
You can also match on document fields and thus limit the stream to certain DocumentIds or to documents that have a certain document field, etc.

Resume your Change Streams

No matter how good your network, there will be situations when connections fail. To make sure that no changes are missed in such cases, you need to add some code for storing and handling resumeTokens. Each event contains a resumeToken, for example:
1"_id": {"_data": {"$binary": "glsIj84AAAACRh5faWQAKwIAWhAEvyfcy4djS8CUKRZ8tvWuOgQ=", "$type": "00"}}
When a failure occurs, the driver should automatically make one attempt to reconnect. The application has to handle further retries as needed. This means that the application should take care of always persisting the resumeToken.
To retry connecting, the resumeToken has to be passed into the optional field resumeAfter when creating the new change stream. This does not guarantee that we can always resume the change stream. MongoDB's oplog is a capped collection that keeps a rolling record of the most recent operations. Resuming a change stream is only possible if the oplog has not rolled yet (that is if the changes we are interested in are still in the oplog).

Caveats

  • Change Streams in Production: If you plan to use change streams in production, please read MongoDB's recommendations.
  • Ordering and Rollbacks: MongoDB guarantees that the received events will be in the order they occurred (thus providing a total ordering of changes across shards if you use shards). On top of that only durable, i.e. majority committed changes will be sent to listeners. This means that listeners do not have to consider rollbacks in their applications.
  • Reading from Secondaries: Change streams can be opened against any data-bearing node in a cluster regardless whether it's primary or secondary. However, it is generally not recommended to read from secondaries as failovers can lead to increased load and failures in this setup.
  • Updates with the fullDocument Option: The fullDocument option for Update Operations does not guarantee the returned document does not include further changes. In contrast to the document deltas that are guaranteed to be sent in order with update notifications, there is no guarantee that the fullDocument returned represents the document as it was exactly after the operation. updateLookup will poll the current version of the document. If changes happen quickly it is possible that the document was changed before the updateLookup finished. This means that the fullDocument might not represent the document at the time of the event thus potentially giving the impression events took place in a different order.
  • Impact on Performance: Up to 1,000 concurrent change streams to each node are supported with negligible impact on the overall performance. However, on sharded clusters, the guarantee of total ordering could cause response times of the change stream to be slower.
  • WiredTiger: Change streams are a MongoDB 3.6 and later feature. It is not available for older versions, MMAPv1 storage or pre pv1 replications.

Learn More

To read more about this check out the Change Streams documentation.
If you're interested in more MongoDB tips, follow us on Twitter @mongodb.

Appendix

How to set up a Cluster in the Cloud

If you haven't yet set up your free cluster on MongoDB Atlas, now is a great time to do so. You have all the instructions in this blog post.

How to set up a Local Cluster

Before setting up the instances please confirm that you are running version 3.6 or later of the MongoDB Server (mongod) and the MongoDB shell (mongo). You can do this by running mongod --version and mongo --version. If either of these do not satisfy our requirements, please upgrade to a more recent version before continuing.
In the following you will set up a single-node replica-set named test-change-streams. For a production replica-set, at least three nodes are recommended.
  1. Run the following commands in your terminal to create a directory for the database files and start the mongod process on port 27017:
    1mkdir -p /data/test-change-streams
    2mongod --replSet test-change-streams --logpath "mongodb.log" --dbpath /data/test-change-streams --port 27017 --fork
  2. Now open up a mongo shell on port 27017:
    1mongo --port 27017
  3. Within the mongo shell you just opened, configure your replica set:
    1config = {
    2 _id: "test-change-streams",
    3 members: [{ _id : 0, host : "localhost:27017"}]
    4};
    5rs.initiate(config);
  4. Still within the mongo shell, you can now check that your replica set is working by running: rs.status();. The output should indicate that your node has become primary. It may take a few seconds to show this so if you are not seeing this immediately, run the command again after a few seconds.
  5. Run
    1export CHANGE_STREAM_DB=mongodb://localhost:27017
    in your shell and continue.

Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Coding With Mark: Abstracting Joins & Subsets in Python


Mar 19, 2024 | 11 min read
Tutorial

Building an Advanced RAG System With Self-Querying Retrieval


Sep 12, 2024 | 21 min read
Tutorial

Caching LLMs Response With MongoDB Atlas and Vector Search


Sep 02, 2024 | 8 min read
Tutorial

How to Use Cohere's Quantized Vectors to Build Cost-effective AI Apps With MongoDB


Oct 03, 2024 | 23 min read
Table of Contents