Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Learn why MongoDB was selected as a leader in the 2024 Gartner® Magic Quadrant™
MongoDB Developer
MongoDB
plus
Sign in to follow topics
MongoDB Developer Center
chevron-right
Developer Topics
chevron-right
Products
chevron-right
MongoDB
chevron-right

Subscribe to MongoDB Change Streams Via WebSockets

Aaron Bassett3 min read • Published Jan 28, 2022 • Updated Sep 23, 2022
MongoDBChange StreamsPython
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Prerequisites

The example code in this article will assume you have a cluster running in MongoDB Atlas, but it will work with any MongoDB version from 3.6 onwards.
You will also need Python 3.6+ with Motor and Tornado installed.
1pip install motor
2pip install tornado
3pip install dnspython
4pip install logzero

Creating a WebSocket Server

To allow clients to subscribe to your change stream via WebSockets, you must first create a WebSocket server. This WebSocket server, written in Python and using Tornado, proxies any new data from the change stream to your connected clients.
1class ChangesHandler(tornado.websocket.WebSocketHandler):
2
3 connected_clients = set()
4
5 def open(self):
6 ChangesHandler.connected_clients.add(self)
7
8 def on_close(self):
9 ChangesHandler.connected_clients.remove(self)
10
11 @classmethod
12 def send_updates(cls, message):
13 for connected_client in cls.connected_clients:
14 connected_client.write_message(message)
15
16 @classmethod
17 def on_change(cls, change):
18 message = f"{change['operationType']}: {change['fullDocument']['name']}"
19 ChangesHandler.send_updates(message)

Opening and Closing WebSocket Connections

As clients connect and disconnect from the WebSocket server, they trigger the open and on_close methods.
1connected_clients = set()
2
3def open(self):
4 ChangesHandler.connected_clients.add(self)
5
6def on_close(self):
7 ChangesHandler.connected_clients.remove(self)
When a client connects, your server stores a reference to it in the connected_clients set. This allows it to push new data to the client when it is received from the change stream. Likewise, when a client disconnects from the server, it is removed from the set of connected clients, so your server does not try to push updates on a connection which no longer exists.
It is worth noting that the server does not have a on_message handler. As WebSockets are bi-directional, typically a WebSocket server has a on_message method. When the client sends data to the server, it invokes this method to handle the incoming message. However, as you are only using the WebSocket connection to push change stream data to the connected clients, your WebSocket connection is essentially mono-directional, and your server does not have a method for handling inbound data.

Pushing Messages to Connected Clients

1@classmethod
2def send_updates(cls, message):
3 for connected_client in cls.connected_clients:
4 connected_client.write_message(message)
5
6@classmethod
7def on_change(cls, change):
8 message = f"{change['operationType']}: {change['fullDocument']['name']}"
9 ChangesHandler.send_updates(message)
When you have new data from the change stream, you pass it to the WebSocket server using the on_change method. This method formats the change stream data into a string ready to be pushed out to the connected clients. This push occurs in the send_updates method. Within this method, you loop over all clients in the connected_clients set and use the write_message action to write the data to the client's WebSocket connection.

Monitoring a Change Stream for Changes

In Motor, MongoDB Collections have a watch() method which you can use to monitor the collection for any changes. Then, whenever there is a new change on the stream, you can use the WebSocket server's on_change method to proxy the data to the WebSocket clients.
1change_stream = None
2
3async def watch(collection):
4 global change_stream
5
6 async with collection.watch(full_document='updateLookup') as change_stream:
7 async for change in change_stream:
8 ChangesHandler.on_change(change)
This watch function is attached to your Tornado loop as a callback.
1def main():
2 client = MotorClient(os.environ["MONGO_SRV"])
3 collection = client["sample_airbnb"]["listingsAndReviews"]
4
5 loop = tornado.ioloop.IOLoop.current()
6 loop.add_callback(watch, collection)
7
8 try:
9 loop.start()
10 except KeyboardInterrupt:
11 pass
12 finally:
13 if change_stream is not None:
14 change_stream.close()

Subscribing to Changes in the Browser Via WebSockets

For this example, your WebSocket client is a simple web page, and it logs any WebSocket messages to the browser's JavaScript console.
1<html>
2 <head>
3 <script>
4 const websocket = new WebSocket('ws://127.0.0.1:8000/socket')
5 websocket.onmessage = function(evt) {
6 console.log(evt.data)
7 }
8 </script>
9 </head>
10 <body></body>
11</html>
You can use Tornado to serve this web page as well.
1class WebpageHandler(tornado.web.RequestHandler):
2 def get(self):
3 self.render("templates/index.html")

Putting it All Together

Screencast showing change being sent in real-time via a WebSocket
To try the example for yourself:
  • clone the example code from GitHub.
  • install the requirements.
  • set the required environmental variables.
  • run the Python script.
1git clone git@github.com:aaronbassett/mongodb-changestreams-tornado-example.git
2cd mongodb-changestreams-tornado-example
3pip install -r requirements.txt
4export MONGO_SRV=
5python server.py
Once the WebSocket server is running in your terminal, open a browser window to localhost:8000 and view your JavaScript console. Then, make some changes to your Collection via Compass or the MongoDB Shell.

Wrap-Up

In this article, you have subscribed to all changes on a single collection. However, you can use change streams to subscribe to all data changes on a single collection, a database, or even an entire deployment. And, because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications.
For more information, see the MotorChangeStream documentation.

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Easy Migration: From Relational Database to MongoDB with MongoDB Relational Migrator


Jan 04, 2024 | 6 min read
Article

Resumable Initial Sync in MongoDB 4.4


Sep 11, 2024 | 5 min read
Podcast

At the Intersection of AI/ML and HCI With Douglas Eck of Google (MongoDB Podcast)


Aug 14, 2024 | 30 min
Article

3 Things to Know When You Switch from SQL to MongoDB


Oct 01, 2024 | 6 min read
Table of Contents