Exploring Window Operators in Atlas Stream Processing
Rate this tutorial
In our previous post on windowing, we introduced window operators available in Atlas Stream Processing. Window operators are one of the most commonly used operations to effectively process streaming data. Atlas Stream Processing provides two window operators: $tumblingWindow and $hoppingWindow. In this tutorial, we will explore both of these operators using the sample solar data generator provided within Atlas Stream Processing.
Before we begin creating stream processors, make sure you have a database user who has “atlasAdmin” access to the Atlas Project. Also, if you do not already have a Stream Processing Instance created with a connection to the sample_stream_solar data generator, please follow the instructions in Get Started with Atlas Stream Processing: Creating Your First Stream Processor and then continue on.
First, confirm sample_stream_solar is added as a connection by issuing
sp.listConnections()
.1 AtlasStreamProcessing> sp.listConnections() 2 { 3 ok: 1, 4 connections: [ 5 { 6 name: 'sample_stream_solar', 7 type: 'inmemory', 8 createdAt: ISODate("2023-08-26T18:42:48.357Z") 9 } 10 ] 11 }
Next, let’s define a $source stage to describe where Atlas Stream Processing will read the stream data from.
1 var solarstream={ $source: { "connectionName": "sample_stream_solar" } }
Then, issue a .process command to view the contents of the stream on the console.
1 sp.process([solarstream])
You will see the stream of solar data printed on the console. A sample of this data is as follows:
1 { 2 device_id: 'device_2', 3 group_id: 3, 4 timestamp: '2023-08-27T13:51:53.375+00:00', 5 max_watts: 250, 6 event_type: 0, 7 obs: { 8 watts: 168, 9 temp: 15 10 }, 11 _ts: ISODate("2023-08-27T13:51:53.375Z"), 12 _stream_meta: { 13 sourceType: 'sampleData', 14 timestamp: ISODate("2023-08-27T13:51:53.375Z") 15 } 16 }
A tumbling window is a fixed-size window that moves forward in time at regular intervals. In Atlas Stream Processing, you use the $tumblingWindow operator. In this example, let’s use the operator to compute the average watts over one-minute intervals.
Refer back to the schema from the sample stream solar data. To create a tumbling window, let’s create a variable and define our tumbling window stage.
1 var Twindow= { 2 $tumblingWindow: { 3 interval: { size: NumberInt(1), unit: "minute" }, 4 pipeline: [ 5 { 6 $group: { 7 _id: "$device_id", 8 max: { $max: "$obs.watts" }, 9 avg: { $avg: "$obs.watts" } 10 } 11 } 12 ] 13 } 14 }
We are calculating the maximum value and average over the span of one-minute, non-overlapping intervals. Let’s use the
.process
command to run the streaming query in the foreground and view our results in the console.1 sp.process([solarstream,Twindow])
Here is an example output of the statement:
1 { 2 _id: 'device_4', 3 max: 236, 4 avg: 95, 5 _stream_meta: { 6 sourceType: 'sampleData', 7 windowStartTimestamp: ISODate("2023-08-27T13:59:00.000Z"), 8 windowEndTimestamp: ISODate("2023-08-27T14:00:00.000Z") 9 } 10 } 11 { 12 _id: 'device_2', 13 max: 211, 14 avg: 117.25, 15 _stream_meta: { 16 sourceType: 'sampleData', 17 windowStartTimestamp: ISODate("2023-08-27T13:59:00.000Z"), 18 windowEndTimestamp: ISODate("2023-08-27T14:00:00.000Z") 19 } 20 }
The pipeline that is used within a window function can include blocking stages and non-blocking stages.
Accumulator operators such as
$avg
, $count
, $sort
, and $limit
can be used within blocking stages. Meaningful data returned from these operators are obtained when run over a series of data versus a single data point. This is why they are considered blocking.Non-blocking stages do not require multiple data points to be meaningful, and they include operators such as
$addFields
, $match
, $project
, $set
, $unset
, and $unwind
, to name a few. You can use non-blocking before, after, or within the blocking stages. To illustrate this, let’s create a query that shows the average, maximum, and delta (the difference between the maximum and average). We will use a non-blocking $match to show only the results from device_1, calculate the tumblingWindow showing maximum and average, and then include another non-blocking $addFields
.1 var m= { '$match': { device_id: 'device_1' } }
1 var Twindow= { 2 '$tumblingWindow': { 3 interval: { size: Int32(1), unit: 'minute' }, 4 pipeline: [ 5 { 6 '$group': { 7 _id: '$device_id', 8 max: { '$max': '$obs.watts' }, 9 avg: { '$avg': '$obs.watts' } 10 } 11 } 12 ] 13 } 14 } 15 16 var delta = { '$addFields': { delta: { '$subtract': ['$max', '$avg'] } } }
Now we can use the .process command to run the stream processor in the foreground and view our results in the console.
1 sp.process([solarstream,m,Twindow,delta])
The results of this query will be similar to the following:
1 { 2 _id: 'device_1', 3 max: 238, 4 avg: 75.3, 5 _stream_meta: { 6 sourceType: 'sampleData', 7 windowStartTimestamp: ISODate("2023-08-27T19:11:00.000Z"), 8 windowEndTimestamp: ISODate("2023-08-27T19:12:00.000Z") 9 }, 10 delta: 162.7 11 } 12 { 13 _id: 'device_1', 14 max: 220, 15 avg: 125.08333333333333, 16 _stream_meta: { 17 sourceType: 'sampleData', 18 windowStartTimestamp: ISODate("2023-08-27T19:12:00.000Z"), 19 windowEndTimestamp: ISODate("2023-08-27T19:13:00.000Z") 20 }, 21 delta: 94.91666666666667 22 } 23 { 24 _id: 'device_1', 25 max: 238, 26 avg: 119.91666666666667, 27 _stream_meta: { 28 sourceType: 'sampleData', 29 windowStartTimestamp: ISODate("2023-08-27T19:13:00.000Z"), 30 windowEndTimestamp: ISODate("2023-08-27T19:14:00.000Z") 31 }, 32 delta: 118.08333333333333 33 }
Notice the time segments and how they align on the minute.
Additionally, notice that the output includes the difference between the calculated values of maximum and average for each window.
A hopping window, sometimes referred to as a sliding window, is a fixed-size window that moves forward in time at overlapping intervals. In Atlas Stream Processing, you use the
$hoppingWindow
operator. In this example, let’s use the operator to see the average.1 var Hwindow = { 2 '$hoppingWindow': { 3 interval: { size: 1, unit: 'minute' }, 4 hopSize: { size: 30, unit: 'second' }, 5 pipeline: [ 6 { 7 '$group': { 8 _id: '$device_id', 9 max: { '$max': '$obs.watts' }, 10 avg: { '$avg': '$obs.watts' } 11 } 12 } 13 ] 14 } 15 }
To help illustrate the start and end time segments, let's create a filter to only return device_1.
1 var m = { '$match': { device_id: 'device_1' } }
Now let’s issue the
.process
command to view the results in the console.1 sp.process([solarstream,m,Hwindow])
An example result is as follows:
1 { 2 _id: 'device_1', 3 max: 238, 4 avg: 76.625, 5 _stream_meta: { 6 sourceType: 'sampleData', 7 windowStartTimestamp: ISODate("2023-08-27T19:37:30.000Z"), 8 windowEndTimestamp: ISODate("2023-08-27T19:38:30.000Z") 9 } 10 } 11 { 12 _id: 'device_1', 13 max: 238, 14 avg: 82.71428571428571, 15 _stream_meta: { 16 sourceType: 'sampleData', 17 windowStartTimestamp: ISODate("2023-08-27T19:38:00.000Z"), 18 windowEndTimestamp: ISODate("2023-08-27T19:39:00.000Z") 19 } 20 } 21 { 22 _id: 'device_1', 23 max: 220, 24 avg: 105.54545454545455, 25 _stream_meta: { 26 sourceType: 'sampleData', 27 windowStartTimestamp: ISODate("2023-08-27T19:38:30.000Z"), 28 windowEndTimestamp: ISODate("2023-08-27T19:39:30.000Z") 29 } 30 }
Notice the time segments.
The time segments are overlapping by 30 seconds as was defined by the hopSize option. Hopping windows are useful to capture short-term patterns in data.
By continuously processing data within time windows, you can generate real-time insights and metrics, which can be crucial for applications like monitoring, fraud detection, and operational analytics. Atlas Stream Processing provides both tumbling and hopping window operators. Together these operators enable you to perform various aggregation operations such as sum, average, min, and max over a specific window of data. In this tutorial, you learned how to use both of these operators with solar sample data.
Check out the MongoDB Atlas Stream Processing announcement blog post. For more on window operators in Atlas Stream Processing, learn more in our documentation.
Log in today to get started. Atlas Stream Processing is available to all developers in Atlas. Give it a try today!
Related
Tutorial
Movie Score Prediction with BigQuery, Vertex AI, and MongoDB Atlas
Jul 11, 2023 | 11 min read
Quickstart
Quickstart Guide to RAG Application Using LangChain and LlamaIndex
Sep 18, 2024 | 10 min read