$emit
Definition
The $emit
stage specifies a connection in the
Connection Registry to emit
messages to. The connection must be either an Apache Kafka broker or a
time series collection.
Syntax
Apache Kafka Broker
To write processed data to an Apache Kafka broker, use the $emit
pipeline stage with the following prototype form:
{ "$emit": { "connectionName": "<registered-connection>", "topic" : "<target-topic>" | <expression>, "config": { "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<deserialization-type>", "outputFormat": "<json-format>" } } }
The $emit
stage takes a document with the following fields:
Field | Type | Necessity | Description | |||||
---|---|---|---|---|---|---|---|---|
connectionName | string | Required | Name, as it appears in the
Connection Registry, of the
connection to ingest data from. | |||||
topic | string | expression | Required | Name of the Apache Kafka topic to emit messages to. | |||||
config | document | Optional | Document containing fields that override various default
values. | |||||
config.headers | expression | Optional | Headers to add to the output message. The expression must evaluate to either an object or an array. If the expression evaluates to an object, Atlas Stream Processing constructs a header from each key-value pair in that object, where the key is the header name, and the value is the header value. If the expression evaluates to an array, it must take the form of an array of key-value pair objects. For example:
Atlas Stream Processing constructs a header from each object in the array, where the key is the header name, and the value is the header value. Atlas Stream Processing supports header values of the following types:
| |||||
config.key | object | string | Optional | Expression that evaluates to a Apache Kafka message key. If you specify | |||||
config.keyFormat | string | Conditional | Data type used to deserialize Apache Kafka key data. Must be one of the following values:
Defaults to | |||||
config.outputFormat | string | Optional | JSON format to use when emitting messages to Apache Kafka. Must be one of the following values:
Defaults to |
Atlas Time Series Collection
To write processed data to an Atlas time series collection,
use the $emit
pipeline stage with the following prototype form:
{ "$emit": { "connectionName": "<registered-connection>", "db" : "<target-db>", "coll" : "<target-coll>", "timeseries" : { <options> } } }
The $emit
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
connectionName | string | Required | Name, as it appears in the
Connection Registry, of the
connection to ingest data from. |
db | string | Required | Name of the Atlas database that contains the target
time series collection. |
coll | string | Required | Name of the Atlas time series collection to write
to. |
timeseries | document | Required | Document defining the time series fields
for the collection. |
Note
The maximum size for documents within a time series collection is 4 MB. To learn more, see Time Series Collection Limitations.
Behavior
$emit
must be the last stage of any pipeline it appears in. You can
use only one $emit
stage per pipeline.
You can only write to a single Atlas time series collection per stream processor. If you specify a collection that doesn't exist, Atlas creates the collection with the time series fields you specified. You must specify an existing database.
You can use a dynamic expression as the value
of the topic
field to enable your stream processor to write to
different target Apache Kafka topics on a message-by-message basis. The
expression must evaluate to a string.
Example
You have a stream of transaction events that generates messages of the following form:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
To sort each of these into a distinct Apache Kafka topic, you can write
the following $emit
stage:
$emit: { connectionName: "kafka1", topic: "$customerStatus" }
This $emit
stage:
Writes the
Very Important Industries
message to a topic namedVIP
.Writes the
N. E. Buddy
message to a topic namedemployee
.Writes the
Khan Traktor
message to a topic namedcontractor
.
For more information on dynamic expressions, see expression operators.
If you specify a topic that doesn't already exist, Apache Kafka automatically creates the topic when it receives the first message that targets it.
If you specify a topic with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue if configured and processes subsequent messages. If there is no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.
Examples
A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:
The
$source
stage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata
, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it toingestionTime
.The
$match
stage excludes documents that have anairTemperature.value
of greater than or equal to30.0
and passes along the documents with anairTemperature.value
less than30.0
to the next stage.The
$emit
stage writes the output to a topic namedstream
over theweatherStreamOutput
Kafka broker connection.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'airTemperature.value': { '$lt': 30 } } }, { '$emit': { connectionName: 'weatherStreamOutput', topic: 'stream' } }
Documents in the stream
topic take the following form:
{ "st":"x+34700+119500", "position": { "type": "Point", "coordinates": [122.8,116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1","AG1","UG1","SA1","MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight":{ "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime":{ "$date":"2024-09-26T17:34:41.843Z" }, "_stream_meta":{ "source":{ "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.