$merge
On this page
Definition
The $merge stage specifies a connection in the Connection Registry to write messages to. The connection must be an Atlas connection.
A $merge
pipeline stage has the following prototype form:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge", "whenNotMatched": "insert | discard" } }
Syntax
The Atlas Stream Processing version of $merge
uses most of the same fields as the Atlas Data Federation version. However, because
Atlas Stream Processing only supports merging into an Atlas connection, the
syntax of into
is simplified. For more information, see
this description of the Atlas Data Federation $merge
fields.
Behavior
$merge
must be the last stage of any pipeline it appears in. You can
use only one $merge
stage per pipeline.
The on
field has special requirements for $merge
against
sharded collections. To learn more, see
$merge syntax.
You can use a dynamic expression as the value of the following fields:
into.db
into.coll
This enables your stream processor to write messages to different target Atlas collections on a message-by-message basis.
Example
You have a stream of transaction events that generates messages of the following form:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
To sort each of these into a distinct Atlas database and
collection, you can write the following $merge
stage:
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" }
This $merge
stage:
Writes the
Very Important Industries
message to a Atlas collection namedVIP.subscription
.Writes the
N. E. Buddy
message to a Atlas collection namedemployee.requisition
.Writes the
Khan Traktor
message to a Atlas collection namedcontractor.billableHours
.
You can only use dynamic expressions that evaluate to strings. For more information on dynamic expressions, see expression operators.
If you specify a database or collection with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue if configured and processes subsequent messages. If there is no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.
Examples
A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:
The
$source
stage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata
, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it toingestionTime
.The
$match
stage excludes documents that have adewPoint.value
of less than or equal to5.0
and passes along the documents withdewPoint.value
greater than5.0
to the next stage.The
$merge
stage writes the output to an Atlas collection namedstream
in thesample_weatherstream
database. If no such database or collection exist, Atlas creates them.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
To view the documents in the resulting sample_weatherstream.stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.