sp.process()
Definition
New in version 7.0: Creates an ephemeral Stream Processor on the current Stream Processing Instance.
Compatibility
This method is supported in Atlas Stream Processing Instances.
Syntax
The sp.process()
method has the following
syntax:
sp.process( [ <pipeline> ], { <options> } )
Command Fields
sp.createStreamProcessor()
takes these fields:
Field | Type | Necessity | Description |
---|---|---|---|
name | string | Required | Logical name for the stream processor. This must be unique
within the stream processing instance. |
pipeline | array | Required | Stream aggregation pipeline you
want to apply to your streaming data. |
options | object | Optional | Object defining various optional settings for your stream
processor. |
options.dlq | object | Conditional | Object assigning a
dead letter queue for your stream processing instance.
This field is necessary if you define the options field. |
options.dlq.connectionName | string | Conditional | Label that identifies a connection in your
connection registry. This connection must reference an
Atlas cluster. This field is necessary if you define the
options.dlq field. |
options.dlq.db | string | Conditional | Name of an Atlas database on the cluster specified
in options.dlq.connectionName . This field is necessary if
you define the options.dlq field. |
options.dlq.coll | string | Conditional | Name of a collection in the database specified in
options.dlq.db . This field is necessary if you
define the options.dlq field. |
Behavior
sp.process()
creates an ephemeral, unnamed stream
processor on the current stream processing instance and immediately
initializes it. This stream processor only persists as long as it
runs. If you terminate an ephemeral stream processor, you must create
it again in order to use it.
Access Control
The user running sp.process()
must have the
atlasAdmin
role.
Example
The following example creates an ephemeral stream processor
which ingests data from the sample_stream_solar
connection. The
processor excludes all documents where the value of the device_id
field is device_8
, passing the rest to a tumbling window with a 10-second
duration. Each window groups the documents it receives, then returns
various useful statistics of each group. The stream processor then
merges these records to solar_db.solar_coll
over the mongodb1
connection.
sp.process( [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )