Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Atlas
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Atlaschevron-right

Exploring Window Operators in Atlas Stream Processing

Robert Walters4 min read • Published Aug 13, 2024 • Updated Aug 13, 2024
Stream ProcessingAtlas
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Atlas Stream Processing is now generally available! Learn more about it here.
In our previous post on windowing, we introduced window operators available in Atlas Stream Processing. Window operators are one of the most commonly used operations to effectively process streaming data. Atlas Stream Processing provides two window operators: $tumblingWindow and $hoppingWindow. In this tutorial, we will explore both of these operators using the sample solar data generator provided within Atlas Stream Processing.

Getting started

Before we begin creating stream processors, make sure you have a database user who has “atlasAdmin” access to the Atlas Project. Also, if you do not already have a Stream Processing Instance created with a connection to the sample_stream_solar data generator, please follow the instructions in Get Started with Atlas Stream Processing: Creating Your First Stream Processor and then continue on.

View the solar stream sample data

For this tutorial, we will be using the MongoDB shell.
First, confirm sample_stream_solar is added as a connection by issuing sp.listConnections().
1AtlasStreamProcessing> sp.listConnections()
2{
3 ok: 1,
4 connections: [
5 {
6 name: 'sample_stream_solar',
7 type: 'inmemory',
8 createdAt: ISODate("2023-08-26T18:42:48.357Z")
9 }
10 ]
11}
Next, let’s define a $source stage to describe where Atlas Stream Processing will read the stream data from.
1var solarstream={ $source: { "connectionName": "sample_stream_solar" } }
Then, issue a .process command to view the contents of the stream on the console.
1sp.process([solarstream])
You will see the stream of solar data printed on the console. A sample of this data is as follows:
1{
2 device_id: 'device_2',
3 group_id: 3,
4 timestamp: '2023-08-27T13:51:53.375+00:00',
5 max_watts: 250,
6 event_type: 0,
7 obs: {
8 watts: 168,
9 temp: 15
10 },
11 _ts: ISODate("2023-08-27T13:51:53.375Z"),
12 _stream_meta: {
13 sourceType: 'sampleData',
14 timestamp: ISODate("2023-08-27T13:51:53.375Z")
15 }
16}

Create a tumbling window query

A tumbling window is a fixed-size window that moves forward in time at regular intervals. In Atlas Stream Processing, you use the $tumblingWindow operator. In this example, let’s use the operator to compute the average watts over one-minute intervals.
Refer back to the schema from the sample stream solar data. To create a tumbling window, let’s create a variable and define our tumbling window stage.
1var Twindow= {
2 $tumblingWindow: {
3 interval: { size: NumberInt(1), unit: "minute" },
4 pipeline: [
5 {
6 $group: {
7 _id: "$device_id",
8 max: { $max: "$obs.watts" },
9 avg: { $avg: "$obs.watts" }
10 }
11 }
12 ]
13 }
14}
We are calculating the maximum value and average over the span of one-minute, non-overlapping intervals. Let’s use the .process command to run the streaming query in the foreground and view our results in the console.
1sp.process([solarstream,Twindow])
Here is an example output of the statement:
1{
2 _id: 'device_4',
3 max: 236,
4 avg: 95,
5 _stream_meta: {
6 sourceType: 'sampleData',
7 windowStartTimestamp: ISODate("2023-08-27T13:59:00.000Z"),
8 windowEndTimestamp: ISODate("2023-08-27T14:00:00.000Z")
9 }
10}
11{
12 _id: 'device_2',
13 max: 211,
14 avg: 117.25,
15 _stream_meta: {
16 sourceType: 'sampleData',
17 windowStartTimestamp: ISODate("2023-08-27T13:59:00.000Z"),
18 windowEndTimestamp: ISODate("2023-08-27T14:00:00.000Z")
19 }
20}

Exploring the window operator pipeline

The pipeline that is used within a window function can include blocking stages and non-blocking stages.
Accumulator operators such as $avg, $count, $sort, and $limit can be used within blocking stages. Meaningful data returned from these operators are obtained when run over a series of data versus a single data point. This is why they are considered blocking.
Non-blocking stages do not require multiple data points to be meaningful, and they include operators such as $addFields, $match, $project, $set, $unset, and $unwind, to name a few. You can use non-blocking before, after, or within the blocking stages. To illustrate this, let’s create a query that shows the average, maximum, and delta (the difference between the maximum and average). We will use a non-blocking $match to show only the results from device_1, calculate the tumblingWindow showing maximum and average, and then include another non-blocking $addFields.
1var m= { '$match': { device_id: 'device_1' } }
1var Twindow= {
2 '$tumblingWindow': {
3 interval: { size: Int32(1), unit: 'minute' },
4 pipeline: [
5 {
6 '$group': {
7 _id: '$device_id',
8 max: { '$max': '$obs.watts' },
9 avg: { '$avg': '$obs.watts' }
10 }
11 }
12 ]
13 }
14}
15
16var delta = { '$addFields': { delta: { '$subtract': ['$max', '$avg'] } } }
Now we can use the .process command to run the stream processor in the foreground and view our results in the console.
1sp.process([solarstream,m,Twindow,delta])
The results of this query will be similar to the following:
1{
2 _id: 'device_1',
3 max: 238,
4 avg: 75.3,
5 _stream_meta: {
6 sourceType: 'sampleData',
7 windowStartTimestamp: ISODate("2023-08-27T19:11:00.000Z"),
8 windowEndTimestamp: ISODate("2023-08-27T19:12:00.000Z")
9 },
10 delta: 162.7
11}
12{
13 _id: 'device_1',
14 max: 220,
15 avg: 125.08333333333333,
16 _stream_meta: {
17 sourceType: 'sampleData',
18 windowStartTimestamp: ISODate("2023-08-27T19:12:00.000Z"),
19 windowEndTimestamp: ISODate("2023-08-27T19:13:00.000Z")
20 },
21 delta: 94.91666666666667
22}
23{
24 _id: 'device_1',
25 max: 238,
26 avg: 119.91666666666667,
27 _stream_meta: {
28 sourceType: 'sampleData',
29 windowStartTimestamp: ISODate("2023-08-27T19:13:00.000Z"),
30 windowEndTimestamp: ISODate("2023-08-27T19:14:00.000Z")
31 },
32 delta: 118.08333333333333
33}
Notice the time segments and how they align on the minute.
Time segments aligned on the minute
Additionally, notice that the output includes the difference between the calculated values of maximum and average for each window.

Create a hopping window

A hopping window, sometimes referred to as a sliding window, is a fixed-size window that moves forward in time at overlapping intervals. In Atlas Stream Processing, you use the $hoppingWindow operator. In this example, let’s use the operator to see the average.
1var Hwindow = {
2 '$hoppingWindow': {
3 interval: { size: 1, unit: 'minute' },
4 hopSize: { size: 30, unit: 'second' },
5 pipeline: [
6 {
7 '$group': {
8 _id: '$device_id',
9 max: { '$max': '$obs.watts' },
10 avg: { '$avg': '$obs.watts' }
11 }
12 }
13 ]
14 }
15}
To help illustrate the start and end time segments, let's create a filter to only return device_1.
1var m = { '$match': { device_id: 'device_1' } }
Now let’s issue the .process command to view the results in the console.
1sp.process([solarstream,m,Hwindow])
An example result is as follows:
1{
2 _id: 'device_1',
3 max: 238,
4 avg: 76.625,
5 _stream_meta: {
6 sourceType: 'sampleData',
7 windowStartTimestamp: ISODate("2023-08-27T19:37:30.000Z"),
8 windowEndTimestamp: ISODate("2023-08-27T19:38:30.000Z")
9 }
10}
11{
12 _id: 'device_1',
13 max: 238,
14 avg: 82.71428571428571,
15 _stream_meta: {
16 sourceType: 'sampleData',
17 windowStartTimestamp: ISODate("2023-08-27T19:38:00.000Z"),
18 windowEndTimestamp: ISODate("2023-08-27T19:39:00.000Z")
19 }
20}
21{
22 _id: 'device_1',
23 max: 220,
24 avg: 105.54545454545455,
25 _stream_meta: {
26 sourceType: 'sampleData',
27 windowStartTimestamp: ISODate("2023-08-27T19:38:30.000Z"),
28 windowEndTimestamp: ISODate("2023-08-27T19:39:30.000Z")
29 }
30}
Notice the time segments.
Overlapping time segments
The time segments are overlapping by 30 seconds as was defined by the hopSize option. Hopping windows are useful to capture short-term patterns in data.

Summary

By continuously processing data within time windows, you can generate real-time insights and metrics, which can be crucial for applications like monitoring, fraud detection, and operational analytics. Atlas Stream Processing provides both tumbling and hopping window operators. Together these operators enable you to perform various aggregation operations such as sum, average, min, and max over a specific window of data. In this tutorial, you learned how to use both of these operators with solar sample data.

Learn more about MongoDB Atlas Stream Processing

Check out the MongoDB Atlas Stream Processing announcement blog post. For more on window operators in Atlas Stream Processing, learn more in our documentation.
Log in today to get started. Atlas Stream Processing is available to all developers in Atlas. Give it a try today!

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Movie Score Prediction with BigQuery, Vertex AI, and MongoDB Atlas


Jul 11, 2023 | 11 min read
Quickstart

Quickstart Guide to RAG Application Using LangChain and LlamaIndex


Sep 18, 2024 | 10 min read
News & Announcements

Transform Your AI Development Skills with the RAG to Riches Developer Quest!


Jul 10, 2024 | 1 min read
Quickstart

How to Connect MongoDB Atlas to Vercel Using the New Integration


Aug 05, 2024 | 4 min read
Table of Contents