$hoppingWindow
On this page
Definition
The $hoppingWindow
stage specifies a
hopping window for aggregation of data.
Atlas Stream Processing windows are stateful, can be recovered if interrupted,
and have mechanisms for processing late-arriving data. You must apply
all other aggregation queries to your streaming data within this
window stage.
$hoppingWindow
A
$hoppingWindow
pipeline stage has the following prototype form:{ "$hoppingWindow": { "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
Syntax
The $hoppingWindow
stage takes a document with the following
fields:
Field | Type | Necessity | Description |
---|---|---|---|
| document | Required | Document specifying the interval of a hopping window as a combination of a size and a unit of time where:
For example, a |
| document | Required | Document that specifies the length of the
hop between window start times as a
combination of a
For example, a |
| array | Required | Nested aggregation pipeline evaluated against the messages within the window. |
| document | Optional | Document specifying a time offset for window boundaries relative
to UTC. The document is a combination of the size field
For example, an |
| document | Optional | Document specifying how long to wait before closing windows if
If you set For example, consider a 12:00 pm to 1:00 pm window and
|
| document | Optional | Document that specifies how long to keep windows generated from the source open to accept late-arriving data after processing documents for window end time. If omitted, defaults to 3 seconds. |
Behavior
Atlas Stream Processing supports only one window stage per pipeline.
When you apply the $group
stage to your window stage,
a single group key
has a limit of 100 megabytes of RAM.
Support for certain aggregation stages might be limited or unavailable within windows. To learn more, see Supported Aggregation Pipeline Stages.
In the event of a service interruption, you can resume the internal pipeline of a window from its state at the point of interruption. To learn more, see Checkpoints.
Examples
A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:
The
$source
stage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata
, exposing each record as it is ingested to the subsequent aggregation stages.The
$hoppingWindow
stage defines overlapping windows of time that are 100 seconds in duration, and which begin every 20 seconds. Each window executes an internalpipeline
which finds the averageliquidPrecipitation.depth
, as defined in thesample_weatherdata
documents streamed from the Apache Kafka broker, for the duration of a given window. Thepipeline
then outputs a single document with an_id
equivalent to the start timestamp of the window it represents and theaveragePrecipitation
for that window.The
$merge
stage writes the output to an Atlas collection namedstream
in thesample_weatherstream
database. If no such database or collection exist, Atlas creates them.
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { // The resulting document's _id is the $hoppingWindow's start timestamp _id: "$_stream_meta.window.start", averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
To view the documents in the resulting sample_weatherstream.stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:20.000Z'), end: ISODate('2024-08-28T19:32:00.000Z') } }, averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:30:40.000Z'), end: ISODate('2024-08-28T19:32:20.000Z') } }, averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:00.000Z'), end: ISODate('2024-08-28T19:32:40.000Z') } }, averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), _stream_meta: { source: { type: 'kafka' }, window: { start: ISODate('2024-08-28T19:31:20.000Z'), end: ISODate('2024-08-28T19:33:00.000Z') } }, averagePrecipitation: 2378.374061433447 }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.