Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$validate

On this page

  • Definition
  • Syntax
  • Behavior
  • Validator Example
  • Examples

The $validate stage checks streaming documents for conformity to a schema of expected ranges, values, or datatypes.

$validate

A $validate pipeline stage has the following prototype form:

{
"$validate": {
"validator": { <filter> },
"validationAction" : "discard" | "dlq"
}
}

The $validate stage takes a document with the following fields:

Field
Type
Necessity
Description

validator

document

Required

Document of expressions used to validate incoming messages against a user-defined schema. You can use all but the following query operators to define validation expressions:

  • $near

  • $nearSphere

  • $text

  • $where

validationAction

string

Optional

Specifies the action to take when a message violates the user-defined schema. You can specify one of the following values:

  • discard: Discards the message. If you do not specify a value for validationAction, this is the default behavior.

  • dlq: Logs the violation to the collection defined in your Stream Processor configuration and performs a best-effort discard without transactional guarantees.

You can use $validate at any point in a pipeline after the $source stage, and before the $emit or $merge stage.

If you specify either the discard or dlq options for the validationAction field, Atlas Stream Processing logs messages which fail validation in the following format:

{
"t": <datetime>,
"s": "<severity-level>",
"c": "streams-<job-name>",
"ctx": "<processed-pipeline>",
"msg": "<message-body>",
"attrs": {
<result-of-logAttributes-evaluation>
},
"tags": <array-of-strings>,
"truncated": {
<truncation-description>
},
"size": <size-of-entry>
}

The following table describes the log entry fields:

Field
Type
Description

attrs

document

Document containing the results of evaluating the logAttributes field in the $validate definition. The result is a list of fields.

c

string

Name of the specific stream processing job in which the failure occurred.

ctx

string

Name of the streaming data pipeline being processed.

msg

string

Body of the message that failed validation.

Atlas Stream Processing supports only JSON Schema Draft 4 or earlier.

The following document shows an example validator expression using $and to perform a logical AND operation:

{
$validate: {
validator: {
$and: [{
$expr: {
$ne: [
"$Racer_Name",
"Pace Car"
]
}
},
{
$jsonSchema: {
required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ],
properties: {
Racer_Num: {
bsonType: "int",
description: "'Racer_Num' is the integer number of the race car and is required"
},
Racer_Name: {
bsonType: "string",
description: "'Racer_Name' must be a string and is required"
},
lap: {
bsonType: "int",
minimum: 1,
description: "'lap' must be a int and is required"
},
Corner_Num: {
bsonType: "int",
minimum: 1,
maximum: 4,
description: "'Corner_Num' must be a int between 1 and 4 and is required"
},
timestamp: {
bsonType: "string",
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$",
description: "'timestamp' must be a string matching iso date pattern and is required"
}
}
}
}]
}, validationAction : "dlq"
}
}

Consider a streaming data source that generates detailed weather reports from various locations. In the following example aggregation pipeline, you include a $validate stage to ensure the documents conform to the schema of the Sample Weather Dataset. The aggregation has four stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, passing each record as it is ingested to the subsequent aggregation stages.

  2. The $validate stage checks whether a document has array values for the position.coordinates and sections fields, passing documents that do to the rest of the pipeline, and passing documents that don't to a DLQ.

  3. The $match stage excludes documents that have a wind.speed.rate value of greater than or equal to 30 and passes along the documents with a wind.speed.rate value of less than 30.

  4. The $merge stage writes the output to an Atlas collection named stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$validate': {
validator: {
'$jsonSchema': { properties: { position: [Object], sections: [Object] } }
},
validationAction: 'dlq'
}
},
{ '$match': { 'wind.speed.rate': { '$lt': 30 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

To view the documents in the resulting sample_weatherstream.sample collection, you can connect to your Atlas cluster using mongosh to run the following command:

Note

The following is a representative example. Streaming data are not static, and each user sees distinct documents.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66b25302fe8bbac5f39dbdba'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('168843')
}
},
airTemperature: { quality: '9', value: 3.5 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 10.9 },
tendency: { code: '3', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '9', value: 1022.5 }
},
callLetters: 'JIVX',
dataSource: '4',
dewPoint: { quality: '9', value: 20.5 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-06T16:44:50.322Z'),
liquidPrecipitation: { condition: '9', depth: 7000, period: 12, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '7' },
period: { quality: '1', value: 3 }
},
position: { coordinates: [ 120.7, -98.2 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 999 },
presentWeatherObservationManual: { condition: '90', quality: '1' },
pressure: { quality: '1', value: 1028.2 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 11.1 },
sections: [ 'UG1', 'MA1', 'GA3', 'KA1', 'UA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 390 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '06' },
lowCloudGenus: { quality: '9', value: '07' },
lowestCloudBaseHeight: { quality: '1', value: 800 },
lowestCloudCoverage: { quality: '9', value: '06' },
midCloudGenus: { quality: '9', value: '07' },
totalCoverage: { opaque: '99', quality: '1', value: '99' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 1200 },
cloudType: { quality: '9', value: '04' },
coverage: { quality: '1', value: '09' }
},
st: 'x+36700+144300',
type: 'FM-13',
visibility: {
distance: { quality: '9', value: 9000 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 9.5, period: 4, quality: '9' }
},
wind: {
direction: { angle: 140, quality: '1' },
speed: { quality: '2', rate: 15.9 },
type: 'N'
}
}

Observe that all documents in this collection have the expected array-type values for position.coordinates and sections. To view the documents that failed validation, given a dead letter queue named dlq, run the following command:

db.getSiblingDB("sample_weatherstream").dlq.find()
{
_id: ObjectId('66b254d3a045fb1406047394'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('168949'),
key: Binary.createFromBase64('', 0),
headers: []
}
},
errInfo: { reason: 'Input document found to be invalid in $validate stage' },
doc: {
airTemperature: { quality: '9', value: 7.6 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 0.3 },
tendency: { code: '8', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '9', value: 1015.9 },
stationPressure: { quality: '1', value: 1017 }
},
callLetters: 'WRGL',
dataSource: '4',
dewPoint: { quality: '9', value: 25.3 },
elevation: 9999,
extremeAirTemperature: { code: 'M', period: 99.9, quantity: '1', value: -30.9 },
liquidPrecipitation: { condition: '9', period: 99, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '2' },
period: { quality: '1', value: 6 }
},
position: { coordinates: -100.2, type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 17 },
presentWeatherObservationManual: { condition: '08', quality: '1' },
pressure: { quality: '9', value: 1001 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 10.4 },
sections: [ 'GA2', 'GA1', 'KA1', 'AA1', 'OA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 240 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '02' },
lowCloudGenus: { quality: '9', value: '02' },
lowestCloudBaseHeight: { quality: '1', value: 150 },
lowestCloudCoverage: { quality: '1', value: '03' },
midCloudGenus: { quality: '1', value: '06' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 450 },
cloudType: { quality: '9', value: '03' },
coverage: { quality: '1', value: '07' }
},
st: 'x+20500-074300',
type: 'SAO',
visibility: {
distance: { quality: '9', value: 3800 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 37.5, period: 7, quality: '9' }
},
wind: {
direction: { angle: 230, quality: '1' },
speed: { quality: '1', rate: 46.3 },
type: 'N'
},
ingestionTime: ISODate('2024-08-06T16:52:35.287Z'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('168949')
}
}
},
processorName: 'sampleWeather'
}

Observe that all documents in the dead letter queue have invalid values for either position.coordinates, sections, or both.

Back

$source