Introduction to Atlas Stream Processing Development
Rate this quickstart
Welcome to this MongoDB Stream Processing tutorial! In this guide, we will quickly set up a coding workflow and have you write and run your first Stream Processing Instance in no time. In a very short time, we'll learn how to create a new stream processor instance, conveniently code and execute stream processors from Visual Studio Code, and simply aggregate stream data, thus opening the door to a whole new field of the MongoDB Atlas developer data platform.
- Prerequisites
- Setup
- Create a stream processor instance
- Set up Visual Studio Code
- The anatomy of a stream processor
- Let's execute a stream processor!
- Hard-coded data in $source declaration
- Simplest stream processor
- Stream processing aggregation
- Add time stamps to the data
- A live MongoDB Atlas cluster that supports stream processing
- Visual Studio Code + MongoDB for VS Code extension
We need to have an Atlas Stream Processing Instance (SPI) ready. Follow the steps in the tutorial Get Started with Atlas Stream Processing: Creating Your First Stream Processor until we have our connection string and username/password, then come back here.
Don't forget to add your IP address to the Atlas Network Access to allow the client to access the instance.
Thanks to the MongoDB for VS Code extension, we can rapidly develop stream processing (SP) aggregation pipelines and run them directly from inside a VS Code MongoDB playground. This provides a much better developer experience. In the rest of this article, we'll be using VS Code.
Such a playground is a NodeJS environment where we can execute JS code interacting with a live stream processor on MongoDB Atlas. To get started, install VS Code and the MongoDB for VS Code extension.
Below is a great tutorial about installing the extension. It also lists some shell commands we'll need later.
- Goal: If everything works, we should see our live SP connection in the MongoDB Extension tab.
Within a stream processing instance (=cluster), it is possible to create and run multiple stream processors (=stream processing pipelines) simultaneously.
A single stream processor (SP) is very similar to a MongoDB aggregation pipeline. It is described by an array of processing stages. However, there are some differences. The most basic SP can be created using only its data source (we'll have executable examples next).
1 // our array of stages 2 // source is defined earlier 3 sp_aggregation_pipeline = [source] 4 sp.createStreamProcessor("SP_NAME", sp_aggregation_pipeline, <OPTIONS>)
A more realistic stream processor would contain at least one aggregation stage, and there can be a large number of stages performing various operations to the incoming data stream. There's a generous limit of 16MB for the total processor size.
1 sp_aggregation_pipeline = [source, stage_1, stage_2...] 2 sp.createStreamProcessor("SP_NAME", sp_aggregation_pipeline, <OPTIONS>)
To increase the development loop velocity, there's an sp.process() function which starts an ephemeral stream processor that won't persist in your stream processing instance.
Let's create basic stream processors and build our way up. First, we need to have some data! Atlas Stream Processing supports several data sources for incoming streaming events. These sources include:
- Hard-coded data declaration in $source.
- Kafka streams.
- MongoDB Atlas databases.
For quick testing or self-contained examples, having a small set of hard-coded data is a very convenient way to produce events. We can declare an array of events. Here's an extremely simple example, and note that we'll make some tweaks later to cover different use cases.
In VS Code, we run an ephemeral stream processor with sp.process(). This way, we don't have to use sp.createStreamProcessor() and sp..drop() constantly as we would for SPs meant to be saved permanently in the instance.
1 src_hard_coded = { 2 $source: { 3 // our hard-coded dataset 4 documents: [ 5 {'id': 'entity_1', 'value': 1}, 6 {'id': 'entity_1', 'value': 3}, 7 {'id': 'entity_2', 'value': 7}, 8 {'id': 'entity_1', 'value': 4}, 9 {'id': 'entity_2', 'value': 1} 10 ] 11 } 12 } 13 sp.process( [src_hard_coded] );
Upon running this playground, we should see data coming out in the VS Code "OUTPUT" tab (CTRL+SHIFT+U to make it appear)
Note: It can take a few seconds for the SP to be uploaded and executed, so don't expect an immediate output.
1 { 2 id: 'entity_1', 3 value: 1, 4 _ts: 2024-02-14T18:52:33.704Z, 5 _stream_meta: { timestamp: 2024-02-14T18:52:33.704Z } 6 } 7 { 8 id: 'entity_1', 9 value: 3, 10 _ts: 2024-02-14T18:52:33.704Z, 11 _stream_meta: { timestamp: 2024-02-14T18:52:33.704Z } 12 } 13 ...
This simple SP can be used to ensure that data is coming into the SP and there are no problems upstream with our source. Timestamps data was generated at ingestion time.
Building on what we have, adding a simple aggregation pipeline to our SP is easy. Below, we're adding a $group stage to aggregate/accumulate incoming messages' "value" field into an array for the requested interval.
Note that the "w" stage (w stands for "Window") of the SP pipeline contains an aggregation pipeline inside. With Stream Processing, we have aggregation pipelines in the stream processing pipeline.
This stage features a $tumblingWindow which defines the time length the aggregation will be running against. Remember that streams are supposed to be continuous, so a window is similar to a buffer.
interval defines the time length of a window. Since the window is a continuous data stream, we can only aggregate on a slice at a time.
idleTimeout defines how long the $source can remain idle before closing the window. This is useful if the stream is not sustained.
1 src_hard_coded = { 2 $source: { 3 // our hard-coded dataset 4 documents: [ 5 {'id': 'entity_1', 'value': 1}, 6 {'id': 'entity_1', 'value': 3}, 7 {'id': 'entity_2', 'value': 7}, 8 {'id': 'entity_1', 'value': 4}, 9 {'id': 'entity_2', 'value': 1} 10 ] 11 } 12 } 13 14 w = { 15 $tumblingWindow: { 16 // This is the slice of time we want to look at every iteration 17 interval: {size: NumberInt(2), unit: "second"}, 18 // If no additional data is coming in, idleTimeout defines when the window is forced to close 19 idleTimeout : {size: NumberInt(2), unit: "second"}, 20 "pipeline": [ 21 { 22 '$group': { 23 '_id': '$id', 24 'values': { '$push': "$value" } 25 } 26 } 27 ] 28 } 29 } 30 sp_pipeline = [src_hard_coded, w]; 31 sp.process( sp_pipeline );
Let it run for a few seconds, and we should get an output similar to the following. $group will create one document per incoming "id" field and aggregate the relevant values into a new array field, "values."
1 { 2 _id: 'entity_2', 3 values: [ 7, 1 ], 4 _stream_meta: { 5 windowStartTimestamp: 2024-02-14T19:29:46.000Z, 6 windowEndTimestamp: 2024-02-14T19:29:48.000Z 7 } 8 } 9 { 10 _id: 'entity_1', 11 values: [ 1, 3, 4 ], 12 _stream_meta: { 13 windowStartTimestamp: 2024-02-14T19:29:46.000Z, 14 windowEndTimestamp: 2024-02-14T19:29:48.000Z 15 } 16 }
Depending on the $tumblingWindow settings, the aggregation will output several documents that match the timestamps. For example, these settings...
1 ... 2 $tumblingWindow: { 3 interval: {size: NumberInt(10), unit: "second"}, 4 idleTimeout : {size: NumberInt(10), unit: "second"}, 5 ...
...will yield the following aggregation output:
1 { 2 _id: 'entity_1', 3 values: [ 1 ], 4 _stream_meta: { 5 windowStartTimestamp: 2024-02-13T14:51:30.000Z, 6 windowEndTimestamp: 2024-02-13T14:51:40.000Z 7 } 8 } 9 { 10 _id: 'entity_1', 11 values: [ 3, 4 ], 12 _stream_meta: { 13 windowStartTimestamp: 2024-02-13T14:51:40.000Z, 14 windowEndTimestamp: 2024-02-13T14:51:50.000Z 15 } 16 } 17 { 18 _id: 'entity_2', 19 values: [ 7, 1 ], 20 _stream_meta: { 21 windowStartTimestamp: 2024-02-13T14:51:40.000Z, 22 windowEndTimestamp: 2024-02-13T14:51:50.000Z 23 } 24 }
See how the windowStartTimestamp and windowEndTimestamp fields show the 10-second intervals as requested (14:51:30 to 14:51:40 etc.).
Atlas Stream Processing uses the MongoDB Query API. You can learn more about the MongoDB Query API with the official Query API documentation, [free] interactive course, and tutorial.
Important: Stream Processing aggregation pipelines do not support all database aggregation operations and have additional operators specific to streaming, like $tumblingWindow. Check the official Stream Processing aggregation documentation.
Even when we hard-code data, there's an opportunity to provide a timestamp in case we want to perform $sort operations and better mimic a real use case. This would be the equivalent of an event-time timestamp embedded in the message.
There are many other types of timestamps if we use a live Kafka stream (producer-assigned, server-side, ingestion-time, and more). Add a timestamp to our messages and use the document's "timeField" property to make it the authoritative stream timestamp.
1 src_hard_coded = { 2 $source: { 3 // define our event "timestamp_gps" as the _ts 4 timeField: { '$dateFromString': { dateString: '$timestamp_msg' } }, 5 // our hard-coded dataset 6 documents: [ 7 {'id': 'entity_1', 'value': 1, 'timestamp_msg': '2024-02-13T14:51:39.402336'}, 8 {'id': 'entity_1', 'value': 3, 'timestamp_msg': '2024-02-13T14:51:41.402674'}, 9 {'id': 'entity_2', 'value': 7, 'timestamp_msg': '2024-02-13T14:51:43.402933'}, 10 {'id': 'entity_1', 'value': 4, 'timestamp_msg': '2024-02-13T14:51:45.403352'}, 11 {'id': 'entity_2', 'value': 1, 'timestamp_msg': '2024-02-13T14:51:47.403752'} 12 ] 13 } 14 }
At this point, we have everything we need to test new pipelines and create proofs of concept in a convenient and self-contained form. In a subsequent article, we will demonstrate how to connect to various streaming sources.
At the time of publishing, Atlas Stream Processing is in public preview and there are a number of known Stream Processing limitations that you should be aware of, such as regional data center availability, connectivity with other Atlas projects, and user privileges.
When running an ephemeral stream processor via sp.process(), many errors (JSON serialization issue, late data, divide by zero, $validate errors) that might have gone to a dead letter queue (DLQ) are sent to the default output to help you debug.
For SPs created with sp.createStreamProcessor(), you'll have to configure your DLQ manually. Consult the documentation for this. On the "Manage Stream Processor" documentation page, search for "Define a DLQ."
After merging data into an Atlas database, it is possible to use existing pipeline aggregation building tools in the Atlas GUI's builder or MongoDB Compass to create and debug pipelines. Since these tools are meant for the core database API, remember that some operators are not supported by stream processors, and streaming features like windowing are not currently available.
With that, you should have everything you need to get your first stream processor up and running. In a future post, we will dive deeper into connecting to different sources of data for your stream processors.
If you have any questions, share them in our community forum, meet us during local MongoDB User Groups (MUGs), or come check out one of our MongoDB .local events.
Top Comments in Forums
There are no comments on this article yet.
Related
Tutorial
Adding Semantic Caching and Memory to Your RAG Application Using MongoDB and LangChain
Aug 13, 2024 | 16 min read
Tutorial
Serverless Development with AWS Lambda and MongoDB Atlas Using Java
Jan 13, 2025 | 6 min read