Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Learn why MongoDB was selected as a leader in the 2024 Gartner® Magic Quadrant™
MongoDB Developer
JavaScript
plus
Sign in to follow topics
MongoDB Developer Center
chevron-right
Developer Topics
chevron-right
Languages
chevron-right
JavaScript
chevron-right

MongoDB & Node.js: Change Streams & Triggers (Part 4 of 4)

29 min • Published May 21, 2021
Node.jsMongoDBChange StreamsJavaScript
Facebook Icontwitter iconlinkedin icon
Rate this video
star-empty
star-empty
star-empty
star-empty
star-empty
search
00:00:00Introduction to Change Streams and Triggers
00:01:45Exploring Change Streams in Node.js
00:11:43Using Event Emitters with Change Streams
00:19:58Monitoring Change Streams with hasNext
00:29:14Leveraging Node's Stream API
00:38:21Setting Up MongoDB Atlas Triggers
00:58:41Conclusion and Community Engagement
The primary focus of the video is to teach how to use MongoDB's change streams and triggers to react in real-time to database changes within a Node.js environment.
🔑 Key Points
  • Change streams allow for real-time monitoring of database changes.
  • Triggers in MongoDB Atlas can execute functions based on database events or scheduled intervals.
  • The video demonstrates four methods to work with change streams in Node.js and MongoDB Atlas.
  • Aggregation pipelines can be used to filter and transform change stream events.
  • MongoDB Atlas handles server management for triggers, simplifying the process for developers.
🔗 Related Links
All MongoDB Videos

Full Video Transcript
sometimes you need to react immediately to changes in your database perhaps you want to place an order with a distributor whenever an item's inventory drops below a given threshold or perhaps you want to send an email whenever the status of an order changes regardless of your use case whenever you want to react immediately to changes in your database change streams and triggers can be a fantastic option if you're just joining us in this quick start with mongodb and node.js series welcome so glad you're here we began by walking through how to connect to a mongodb database and then how to perform each of the crud that's create read update and delete operations then we moved on to more advanced topics like the aggregation framework and transactions today we're going to cover change streams and triggers for each one i'll explain what it is and then walk through an example of how to use it let's kick things off with change streams change streams allow you to receive notifications about changes made to your mongodb databases and collections when you use change streams you can choose to program actions that will be automatically taken whenever a change event occurs change streams utilize the aggregation framework so you can choose to filter for specific change events or transform the change event documents for example let's say that i want to be notified whenever a new listing in the sydney australia market is added to the listings and reviews collection i could create a change stream that monitors the listings and review collection and use an aggregation pipeline to match on the listings i'm interested in today i'll walk you through three different ways to implement this change stream i'm going to begin working in a basic template file this code is structured very similar to the code you saw in the previous videos if you have any questions about what this template code is doing head back to that first video in the series now i've already connected to a database in mongodb atlas that has the sample data set loaded today i'm going to continue working in the sample airbnb database i find it helpful to have a script that will generate sample data for me when i'm testing change streams so i wrote a script named change streams test data that will quickly generate sample data you can grab a copy of the script in my github repo if you want to follow along that script is going to do the following it's going to create three new listings those are opera house views private room in london and beautiful beach house then it's going to update two of those listings it's going to update opera house views and beautiful beach house it's going to create two more listings italian villa and sydney harbour home and then finally it's going to delete a listing and that listing is the sydney harbor home listing so this script gives us a variety of crud operations we can use to test our change stream now that we're set up let's explore three different ways to work with a change stream in node.js regardless of how i monitor changes in the change stream i want to close the change stream after a certain amount of time so i've written a helper function to do just that as you can see this function has two parameters first it has time in ms which is the time in milliseconds after which the change stream should be closed it defaults to sixty thousand milliseconds which is one minute the second param is the change stream to be closed so after the given amount of time the function calls change stream dot close which is what will actually close the change stream now we are ready to start using change streams the first way we will monitor change stream is using the on function the mongodb node.js driver's change stream class inherits from the node built-in class event emitter as a result we can use event emitters on function to add a listener function that would be called whenever a change occurs in the change stream if that was a bit confusing don't worry it should become more clear as i start writing code let's write a function that will monitor changes in the change stream using event emitters on so i'm going to create an asynchronous function named monitor listings using event emitter for parameters let's use a connected [ __ ] client the time in milliseconds that indicates how long the change stream should be monitored let's set that default to 60 000 and an aggregation pipeline that the change stream will use now we need to access the collection we will monitor for changes we want to monitor the listings and arrays collection so we'll say const collection equals client.db sample airbnb dot collection listings and reviews now we're ready to create our change stream we'll say collection dot watch and we're going to pass the pipeline watch will return a change stream so we'll store what's returned in a constant named change stream once we've got our change stream we can add a listener to it so i'll say change stream dot on we want to listen for a change event and then we need to define our listener so let's just create an anonymous function and we'll name the variable next and then let's just log next really we aren't doing much here we're just doing logging right but this is where you could do the exciting stuff this is where you can fire an email or place an order or take whatever intelligent action you need to based on the change in the collection now i could choose to leave the change stream open indefinitely instead i'm going to call that helper function to close the change stream after the given amount of time so i'll say await close change stream and i'm going to pass that time in milliseconds and the change stream now that i've implemented the function let's try it out i'm going to go up to main and i'm going to call that function by saying await monitor listings using event emitter and for now i'm just going to pass the client and i'll set the time in milliseconds to 15 000. so it's 15 seconds let me save this file i've opened two terminals at the bottom of vs code so let me expand the terminal so we can see more the left terminal is where i'm going to run the script that will open the change stream and watch for changes so let me kick this off the change stream will open for 15 seconds in the right terminal i'm going to kick off change streams test data which will create update and delete documents we're getting a lot of output here let me scroll up to the top recall that my code is printing out each change event that it receives so looking at that first change we can see that change streams test data is creating a new airbnb listing the change stream gives us a lot of information about this change to the database so let's walk through each piece each change event has an underscore id this underscore id can be used when you want to resume a change stream and we'll talk a bit about this more later we can see that the operation type is insert we can also see when the event occurred next up we have the full document here we can see the document that was inserted into the database this can be super helpful when you're writing custom code for example maybe you're sending an email to notify users of new listings in the areas they are interested in you might want to include the name and address of the listing in the email you're sending and you can get that information here in the change event without having to write a separate query to get that data next we have ns which stands for namespace this is the database and collection affected by the event and finally we have the document key this is the underscore id of the document that was inserted and you can see the document key in the change event matches the id that is printed in change streams test data i'll scroll quickly through the other change events we can see another change event for the second listing that was created and another for the third after the inserts change streams test data updates one of the listings we can see many of the same information that we saw for inserts we can also see update description which shows which fields were updated and which fields were removed if you wanted to see the full document and not just the updated fields you can configure the change stream to include that as well okay next up we see another update next we see two more inserts these look pretty similar to what we saw previously so i'm just going to keep scrolling the last thing change streams test data does is delete a document here we see information similar to what we've seen before the event id the operation type which is delete the time the event occurred the name space for the event and the id of the document that was deleted at the very bottom here we can see the change stream was closed in this example i captured and logged all of the change events in some cases you're not going to care about all the change events that occur in a collection instead you're going to want to limit what changes you are monitoring you can use an aggregation pipeline to filter the changes or transform the change stream event documents so let's think back to my objective i want to be notified whenever a new listing is created in the sydney australia market i'm going to create an aggregation pipeline to filter for only those changes in the listings and reviews collection if you're not familiar with aggregation pipelines check out my earlier video in this series so i'm going to hop back to the code and i'm going to create a constant named pipeline i'm going to use dollar match to filter the change events i'm looking for new listings in the collection so i want the operation type to be insert and then i want to check where the listing is located so i'll say full document dot address dot country is australia and let's narrow that a bit more down to sydney so i'll say full document dot address dot market is sydney i want to use this pipeline to filter the changes in the change stream i'm going to pass this pipeline to the function that creates our change stream okay i'm going to save these changes and let me expand the terminal again i'm going to clear the output and then i'm going to kick off both scripts once again all right this time we got a lot less output let me scroll up to the top of the output the first event is for an operation of type insert in sydney australia and so is the second the other inserts that were not in sydney were automatically filtered out the other change events like updates and deletes were also filtered out as you can see the aggregation pipeline is a really powerful and easy way to filter and transform your change events the first way we monitor the change stream was using event emitters on function let's take a look at another way to work with change streams we can create a while loop that waits for the next element in the change stream by using has next from the mongodb node.js drivers change stream class let's create a function that will monitor changes in the change stream using change streams has next i'm going to create an asynchronous function named monitor listings using has next i'm going to use the same parameters that i used in the last function so i'll have a connected [ __ ] client a time in milliseconds that indicates how long the change stream should be open and i'm going to set the default to 60 000 and an aggregation pipeline that the chain stream will use the first thing i want to do in this function is access the collection that i want to monitor for changes so i'll say const collection equals client dot db sample airbnb dot collection listings and reviews now i'm ready to create the change stream i'll do this the same way that i did in the previous function i'll say const change stream equals collection dot watch pipeline i could choose to leave this change stream open indefinitely instead i'm going to call the helper function that will set a timer and close the change stream so i'll say close change stream and i'll pass time in milliseconds and the change stream now let's actually monitor the change stream for changes i'm going to create a while loop that will wait for new changes in the change stream so i'll say await change stream dot has next has next will wait to return true until a new change arrives in the change stream i can get the change stream event by saying await change stream dot next i could choose to store this event in a variable and then i could do something really valuable and exciting with this event instead i'm just going to log it for now has next will throw an error as soon as the change stream is closed so i'm going to wrap this section in a try catch if an error is thrown i'm going to check to see if the change stream is closed if it's closed i'm going to log that information so i'll say the change stream is closed will not wait on any more changes if the change stream is not closed something unexpected happened so i'm just going to throw that error on now that i've implemented this function let's try it out so i'm going to go back up to main and i'm going to replace this call and instead i'm going to say monitor listings using has next and i'll pass the client and set the time to 15 seconds and uh let's add a wait here okay let me save this file i'm going to expand the terminal and clear the output let me go ahead and run both scripts again okay a lot of output here let me scroll up a bit okay we can see we're getting change stream events just like we did before we can see events for insert insert insert update update insert insert and delete so it looks like we got them all now just as i did previously with the last function i could choose to pass a pipeline to this function in order to filter the events i'll leave this as an exercise for you to try on your own so far we've covered two ways to monitor change streams in node.js let's examine a third way let's use node's stream api in order to use the stream module we're going to need to load it so all the way at the top of the file i'm going to say const stream equals require stream okay looks good let's go create a function let's make this an asynchronous function named monitor listings using stream api i'm going to stick with the same parameters that i used in the first two functions so i'll need a connected [ __ ] client a time in milliseconds that indicates how long the change stream should be monitored as always i'm going to set the default to 60 seconds and an aggregation pipeline that the change stream will use okay let's start implementing this function just like i did with other functions i'll create a constant for the listings and reviews collection so i'll say const collection equals client dot db sample airbnb dot collection listings and reviews now i'm going to create the chain stream in the same way i did for the other two functions i'll say const change stream equals collection dot watch pipeline okay now i'm ready to monitor the change stream so i'll say change stream dot stream this is going to return a node readable stream i'm going to use readable's pipe to pull the data out of the stream and write it to the console and just like the other functions i'm simply logging this but once again this is where you can do the exciting work that's going to get you your helpful business value you can do so much more here than just log the change event i could choose to leave the change stream open indefinitely instead i'm going to call that helper function that's going to set a timer and close the change stream so i'll say await close change stream time in milliseconds and change stream okay that's all we need to do for this function let's call it so going back up to main i'm going to replace this call with our new one so i'll say await monitor listings using stream api i'm going to pass the client and this time i'm going to stick with the default time and the default empty pipeline let me save this all right i'm going to clear the terminals and then let me run these as you can see we are getting a lot of output here for all of those change stream events i can run change streams.testdata again and you can see we're getting even more output now just as a reminder i could choose to pass a pipeline here and filter those change events at some point your application will likely lose the connection to the change stream perhaps a network event will occur and a connection between the application and the database will be dropped or perhaps your application will crash and need to be restarted but you're a 10x developer right that's never going to happen to you right okay moving on in any of those cases you may want to resume the change stream where you previously left off so you don't lose any of those change events each change stream event document contains a resume token the node.js driver automatically stores the resume token in the underscore id of the change event document and we saw that a few minutes ago when we were looking at the change event documents the application can pass the resume token when creating a new change stream the change stream will include all events that happened after the event associated with the given resume token the mongodb node.js driver will automatically attempt to re-establish connections in the event of transient network errors or elections in those cases the driver will use its cached copy of the most recent resume token so that no change stream events are lost in the event of an application failure or restart the application will need to pass the resume token when creating the change stream in order to ensure no change stream events are lost keep in mind the driver will lose its cached copy of the most recent resume token when the application restarts so your application should store the resume token for more information and sample code for resuming change streams check out the official documentation so far we've talked about different ways to monitor change streams change streams allow you to react immediately to changes in your database now when you use change streams you probably want to ensure that your application is always up and running so you aren't missing any of those change events keeping an application up and running is certainly possible but it can be challenging right this is where mongodb atlas triggers come in atlas triggers allow you to execute functions in real time based on database events just like change streams or unscheduled intervals just like a cron job atlas triggers have a few big advantages first you don't have to worry about programming the change stream you simply program the function that will be executed when the database event is fired second you don't have to worry about managing the server where your change stream code is running atlas takes care of the server management for you and third you get a handy ui to configure your trigger which means you've got less code to write i'm going to create a trigger to do the same thing i did with change streams earlier i want to be alerted when new listings are created in the sydney australia market now instead of working locally in a code editor to create a monitor change stream i'm going to create a trigger in the atlas web ui so i'm here on the triggers page in atlas and i'm going to click add trigger i can choose to create a database trigger or a scheduled trigger i'm going to stick with the database trigger i'm going to name this trigger new listing in sydney i'm going to leave the trigger enabled and i'm going to leave the event ordering enabled as well in the link data sources selection box i'll choose cluster 0 which is where my sample airbnb database is stored then i'm going to click link this will take just a moment so i'm going to skip ahead through the magic of editing all right my cluster is now linked now we're ready for the trigger source details for cluster i'll select cluster 0. for database i'll select sample airbnb for collection name i'll select listings and reviews for operation type i can choose which operations i want to watch right insert update delete and or replace i'm going to select insert i want the full document to be in the change event so i'm going to enable this option for event type i'm going to stick with function this default function code includes a lot of code to help you get started i'm just going to delete all of this and i'm simply going to log the document so i'll say console.log i want to get the change event dot full document and then let's stringify that so that it will print nicely now just like i mentioned with the functions i wrote in the node script i'm not really doing anything exciting here i'm just logging but this is where you could implement the exciting and useful actions based on the change events i'm going to expand the advanced section i want to limit the change events just to those in the sydney australia market so i'll say full document dot address dot country is australia and full document dot address dot market is sydney keep in mind that this dollar match statement is just like the aggregation pipeline i passed to the change stream back when i was working in my node.js script all right all this looks good i'm going to click save the trigger is now enabled from this point on the function to log the change event will be called whenever a new document in the sydney australia market is inserted in the listings and reviews collection now that the trigger is enabled let's create sample data that will fire the trigger so i'm going to jump back to vs code and i'm going to run change streams test data just like i did when i was testing the change stream code in the node.js script now i'm going to go back to atlas let's take a look at the results when i created the trigger mongodb atlas automatically created a realm application for me named triggers realm app let me open that up the function associated with the trigger doesn't currently do much it simply logs the change documents so let's take a look at the logs here we can see two entries in the logs let me expand those both listings are in the sydney market so even though our script inserts five documents makes updates to two documents and then deletes another document our function is only being called for the two new listings in sydney perfect triggers give you so much power and so many possibilities with very little programming let's wrap up this video today we explored four different ways to accomplish this same task of reacting immediately to changes in a mongodb database i showed you three different ways to work with change streams in node.js using node.js's built-in event emitter class using mongodb node.js drivers change stream class and using the stream api finally i showed you how to create an atlas trigger that will monitor changes in all four cases we were able to use dollar match to filter the change stream events the examples we explored today all did relatively simple things whenever an event was fired right they simply log the change events but change streams and triggers become really powerful when you start doing more in response to change events for example you might want to fire alarms or send emails or place orders or update other systems or do 100 other amazing incredible things right the sky's the limit if you want a quick reference for what i showed you today check out the blog series i wrote that covers the exact same topics i also have a github repo that contains the code i wrote today so i've included links to both of those in the description below this is unfortunately the final video in the node.js and mongodb quickstart series at least for now i've had a blast so if you have ideas for other topics you'd like to see covered or you've got any questions about things you saw in this video series please join me in the mongodb community my teammates and i are there every day chatting with members of our developer community they're asking questions we're answering them we're working together and we're also discussing best practices so come on join the community i would love to see you there you

Facebook Icontwitter iconlinkedin icon
Rate this video
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

How to Develop a Web App With Netlify Serverless Functions and MongoDB


Jan 13, 2025 | 6 min read
Tutorial

MongoDB Atlas Authentication Using Service Accounts (OAuth)


Jan 23, 2025 | 9 min read
Tutorial

Type Safety With Prisma & MongoDB


Aug 09, 2024 | 4 min read
Tutorial

Real Time Data in a React JavaScript Front-End with Change Streams


Sep 09, 2024 | 6 min read