Manage Stream Processors
On this page
- Prerequisites
- Considerations
- Create a Stream Processor Interactively
- Connect to your stream processing instance.
- Define a pipeline.
- Create a stream processor.
- Create a Stream Processor
- Start a Stream Processor
- Stop a Stream Processor
- Modify a Stream Processor
- Limitations
- To modify a stream processor:
- Drop a Stream Processor
- List Available Stream Processors
- Sample from a Stream Processor
- View Statistics of a Stream Processor
An Atlas Stream Processing stream processor applies the logic of a uniquely named stream aggregation pipeline to your streaming data. Atlas Stream Processing saves each stream processor definition to persistent storage so that it can be reused. You can only use a given stream processor in the stream processing instance its definition is stored in. Atlas Stream Processing supports up to 4 stream processors per worker. For additional processors that exceed this limit, Atlas Stream Processing allocates a new resource.
Prerequisites
To create and manage a stream processor, you must have:
mongosh
version 2.0 or higherA database user with the
atlasAdmin
role to create and run stream processorsAn Atlas cluster
Considerations
Many stream processor commands require you to specify the name of the
relevant stream processor in the method invocation. The syntax
described in the following sections assumes strictly alphanumeric
names. If your stream processor's name includes non-alphanumeric
characters such as hyphens (-
) or full stops (.
), you must
enclose the name in square brackets ([]
) and double quotes
(""
) in the method invocation, as in
sp.["special-name-stream"].stats()
.
Create a Stream Processor Interactively
You can create a stream processor interactively with the
sp.process()
method. Stream processors that you create
interactively exhibit the following behavior:
Write output and dead letter queue documents to the shell
Begin running immediately upon creation
Run for either 10 minutes or until the user stops them
Don't persist after stopping
Stream processors that you create interactively are intended for prototyping. To create a persistent stream processor, see Create a Stream Processor.
sp.process()
has the following syntax:
sp.process(<pipeline>)
Field | Type | Necessity | Description |
---|---|---|---|
| array | Required | Stream aggregation pipeline you want to apply to your streaming data. |
Connect to your stream processing instance.
Use the connection string associated with your stream processing instance
to connect using mongosh
.
Example
The following command connects to a stream processing instance as a user named
streamOwner
using x.059
authentication:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Provide your user password when prompted.
Define a pipeline.
In the mongosh
prompt, assign an array containing the
aggregation stages you want to apply to a variable named
pipeline
.
The following example uses the stuff
topic in
the myKafka
connection in the connection registry as the
$source
, matches records where the temperature
field has a value of 46
and emits the processed messages to
the output
topic of the mySink
connection in
the connection registry:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
Create a Stream Processor
To create a stream processor:
The Atlas Administration API provides an endpoint for creating a stream processor.
To create a new stream processor with mongosh
, use the
sp.createStreamProcessor()
method. It has the following syntax:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | Type | Necessity | Description |
---|---|---|---|
| string | Required | Logical name for the stream processor. This must be unique within the stream processing instance. This name should contain only alphanumeric characters. |
| array | Required | Stream aggregation pipeline you want to apply to your streaming data. |
| object | Optional | Object defining various optional settings for your stream processor. |
| object | Conditional | Object assigning a
dead letter queue for your stream processing instance. This field is
necessary if you define the |
| string | Conditional | Human-readable label that identifies a connection in your
connection registry. This connection must reference an
Atlas cluster. This field is necessary if you define the
|
| string | Conditional | Name of an Atlas database on the cluster specified
in |
| string | Conditional | Name of a collection in the database specified in
|
Connect to your stream processing instance.
Use the connection string associated with your stream processing instance
to connect using mongosh
.
Example
The following command connects to a stream processing instance as a user named
streamOwner
using x.059
authentication.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Provide your user password when prompted.
Define a pipeline.
In the mongosh
prompt, assign an array containing the
aggregation stages you want to apply to a variable named
pipeline
.
The following example uses the stuff
topic in
the myKafka
connection in the connection registry as the
$source
, matches records where the temperature
field has a value of 46
and emits the processed messages to
the output
topic of the mySink
connection in
the connection registry:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
(Optional) Define a DLQ.
In the mongosh
prompt, assign an object containing the
following properties of your DLQ:
Connection name
Database name
Collection name
The following example defines a DLQ over the cluster01
connection, in the metadata.dlq
database collection.
deadLetter = { dlq: { connectionName: "cluster01", db: "metadata", coll: "dlq" } }
Start a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To start a stream processor:
The Atlas Administration API provides an endpoint for starting a stream processor.
To start an existing stream processor with mongosh
, use the
sp.<streamprocessor>.start()
method. <streamprocessor>
must be
the name of a stream processor defined for the current stream processing instance.
For example, to start a stream processor named proc01
, run the
following command:
sp.proc01.start()
This method returns:
true
if the stream processor exists and isn't currently running.false
if you try to start a stream processor that doesn't exist, or exists and is currently running.
Stop a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To stop a stream processor:
The Atlas Administration API provides an endpoint for stopping a stream processor.
To stop an existing stream processor with mongosh
, use the
sp.<streamprocessor>.stop()
method. <streamprocessor>
must be
the name of a currently running stream processor defined for the
current stream processing instance.
For example, to stop a stream processor named proc01
, run the
following command:
sp.proc01.stop()
This method returns:
true
if the stream processor exists and is currently running.false
if the stream processor doesn't exist, or if the stream processor isn't currently running.
Modify a Stream Processor
You can modify the following elements of an existing stream processor:
To modify a stream processor, do the following:
Apply your update to the stream processor.
By default, modified processors restore from the last checkpoint. Alternatively,
you can set resumeFromCheckpoint=false
, in which case the processor only
retains summary stats. When you modify a processor with open windows, the windows
are entirely recomputed on the updated pipeline.
Limitations
When the default setting resumeFromCheckpoint=true
is enabled, the following
limitations apply:
You can't modify the
$source
stage.You can't modify the interval of your window.
You can't remove a window.
You can only modify a pipeline with a window if that window has either a
$group
or$sort
stage in its inner pipeline.You can't change an existing window type. For example, you can't change from a
$tumblingWindow
to a$hoppingWindow
or vice versa.Processors with windows may reprocess some data as a product of recalculating the windows.
To modify a stream processor:
Requires mongosh
v2.3.4+.
Use the sp.<streamprocessor>.modify()
command to modify an existing
stream processor. <streamprocessor>
must be the name of a stopped stream
processor defined for the current stream processing instance.
For example, to modify a stream processor named proc01
, run the
following command:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
Add a Stage to an Existing Pipeline
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
Modify the Input Source of a Stream Processor
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
Remove a Dead Letter Queue from a Stream Processor
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
Modify a Stream Processor with a Window
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
The Atlas Administration API provides an endpoint for modifying a stream processor.
Drop a Stream Processor
To drop a stream processor:
The Atlas Administration API provides an endpoint for deleting a stream processor.
To delete an existing stream processor with mongosh
, use the
sp.<streamprocessor>.drop()
method. <streamprocessor>
must be
the name of a stream processor defined for the current stream processing instance.
For example, to drop a stream processor named proc01
, run the
following command:
sp.proc01.drop()
This method returns:
true
if the stream processor exists.false
if the stream processor doesn't exist.
When you drop a stream processor, all resources that Atlas Stream Processing provisioned for it are destroyed, along with all saved state.
List Available Stream Processors
To list all available stream processors:
The Atlas Administration API provides an endpoint for listing all available stream processors.
To list all available stream processors on the current stream processing instance with
mongosh
, use the sp.listStreamProcessors()
method. It returns
a list of documents containing the name, start time, current state, and
pipeline associated with each stream processor. It has the following
syntax:
sp.listStreamProcessors(<filter>)
<filter>
is a document specifying which field(s) to filter the list
by.
Example
The following example shows a return value for an unfiltered request:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
If you run the command again on the same stream processing instance, filtering for a
"state"
of "running"
, you see the following output:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
Sample from a Stream Processor
To return an array of sampled results from an existing stream processor
to STDOUT
with mongosh
, use the
sp.<streamprocessor>.sample()
method. <streamprocessor>
must be
the name of a currently running stream processor defined for the
current stream processing instance. For example, the following command samples from a
stream processor named proc01
.
sp.proc01.sample()
This command runs continuously until you cancel it by using
CTRL-C
, or until the returned samples cumulatively reach 40 MB in
size. The stream processor reports invalid documents in the sample in
a _dlqMessage
document of the following form:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
You can use these messages to diagnose data hygiene issues without defining a dead letter queue collection.
View Statistics of a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To view statistics of a stream processor:
The Atlas Administration API provides an endpoint for viewing the statistics of a stream processor.
To return a document summarizing the current status of an existing
stream processor with mongosh
, use the
sp.<streamprocessor>.stats()
method. streamprocessor
must be
the name of a currently running stream processor defined for the
current stream processing instance. It has the following syntax:
sp.<streamprocessor>.stats({options: {<options>}})
Where options
is an optional document with the following fields:
Field | Type | Description |
---|---|---|
| integer | Unit to use for the size of items in the output. By default,
Atlas Stream Processing displays item size in bytes. To display in KB,
specify a |
| boolean | Flag that specifies the verbosity level of the output document.
If set to |
The output document has the following fields:
Field | Type | Description |
---|---|---|
| string | The namespace the stream processor is defined in. |
| object | A document describing the operational state of the stream processor. |
| string | The name of the stream processor. |
| string | The status of the stream processor. This field can have the following values:
|
| integer | The scale in which the size field displays. If set to |
| integer | The number of documents published to the stream. A document
is considered 'published' to the stream once it passes
through the |
| integer | The number of bytes or kilobytes published to the stream.
Bytes are considered 'published' to the stream once they pass
through the |
| integer | The number of documents processed by the stream. A document is considered 'processed' by the stream once it passes through the entire pipeline. |
| integer | The number of bytes or kilobytes processed by the stream. Bytes are considered 'processed' by the stream once they pass through the entire pipeline. |
| integer | The number of documents sent to the Dead Letter Queue. |
| integer | The number of bytes or kilobytes sent to the Dead Letter Queue. |
| integer | The difference, in seconds, between the event time represented by the most recent change stream resume token and the latest event in the oplog. |
| token | The most recent change stream resume token. Only applies to stream processors with a change stream source. |
| integer | The number of bytes used by windows to store processor state. |
| integer | The timestamp of the current watermark. |
| array | The statistics for each operator in the processor pipeline.
Atlas Stream Processing returns this field only if you pass in the
Additionally,
|
| integer | The maximum memory usage of the operator in bytes or kilobytes. |
| integer | The total execution time of the operator in seconds. |
| array | Offset information for an Apache Kafka broker's partitions.
|
| integer | The Apache Kafka topic partition number. |
| integer | The offset that the stream processor is on for the
specified partition. This value equals the previous offset
that the stream processor processed plus |
| integer | The offset that the stream processor last committed to the Apache Kafka broker and the checkpoint for the specified partition. All messages through this offset are recorded in the last checkpoint. |
For example, the following shows the status of a stream processor named
proc01
on a stream processing instance named inst01
with item sizes displayed in
KB:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }