Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$https

On this page

  • Definition
  • Syntax
  • Behavior
  • Examples
$https

The $https stage specifies a connection in the Connection Registry to send HTTPS requests to. Each time a prior stage passes a document to $https, the stage sends a new request.

To send an HTTPS request to a given connection:

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>
}
}
}

The $https stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Required

Label that identifies the connection in the Connection Registry to send an HTTPS request to.

path

string | expression

Optional

Path to append to the URL your connectionName resolves to.

For example, if you specify a connectionName that resolves to https://sample.com, you can specify a path of "endpoint" to have your stream processor send HTTPS requests to https://sample.com/endpoint.

If you define path as an expression, that expression must evaluate to a string.

The API endpoint that you call should be idempotent.

parameters

document

Optional

Document containing key-value pairs to pass as parameters to your API endpoint call. Each key must be a string, and each value must evaluate to either a numeric, string, or boolean value. This field supports expressions as values.

method

string

Optional

HTTPS request method for your connection. Must be one of the following values:

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

Defaults to "GET".

headers

document

Optional

Document containing key-value pairs to pass as headers to the API endpoint. Each key must be a string, and each value must evaluate to a string. This field supports expressions as values.

If the API endpoint requires authentication, such as an API key or Bearer Access Token authentication, you should add authentication details as headers when you define the connection to prevent providing these as plaintext as part of this operator.

Invalid HTTP header names and values are not sent to the API endpoint. Instead, these are ignored.

To learn more about invalid HTTP headers, see RFC 9110.

If an expression in a value fails or evaluates to a type other than string, the message is sent to the dead letter queue and the operator doesn't send this request to the API endpoint.

as

string

Required

Name of the field for the REST API response.

If the endpoint returns 0 bytes, the operator doesn't set the as field.

The operator supports responses with a Content-Type of application/json or text/plain. If the API endpoint returns a response with a different Content-Type, the operator handles the document based on the onError behavior you define.

If the API endpoint returns a response without a defined Content-Type, the operator assumes that the response is application/json.

onError

string

Optional

Behavior when the operator encounters an HTTPS-related failure. Must be one of the following values:

  • "dlq" : Pass the affected document to the dead letter queue.

  • "ignore" : Do nothing with the affected document.

  • "fail" : Terminate the stream processor on error.

The operator considers all 2XX HTTP status codes as successes. If the operator receives any of the following HTTP status codes in response, the operator follows the behavior based on the value you provide in this field:

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

The operator considers any other HTTP status code as a "fail" error. For example, if the API endpoint returns a 500 HTTP status code, the processor enters a failed state and stops.

onError does not trigger on errors arising from incorrect configuration of the $https operator itself, such as invalid expressions.

Defaults to "dlq".

payload

array

Optional

Custom inner pipeline that allows you to customize the request body sent to the API endpoint. payload supports the following expressions:

  • $project

  • $addFields

  • $replaceRoot

  • $set

By default, the entire message is sent to the API endpoint. Atlas Stream Processing sends a relaxed mode JSON payload to the API endpoint.

Invalid HTTP request bodies are not sent to the API endpoint. Instead, these are sent to the dead letter queue.

To learn more about invalid HTTP request bodies, see RFC 9110.

config

document

Optional

Document containing fields that override various default values.

config.connectionTimeoutSec

integer

Optional

The time, in seconds, after which a successful HTTPS connection times out if it receives no response.

Defaults to 30.

config.requestTimeoutSec

integer

Optional

The time, in seconds, after which an HTTPS request times out if it cannot connect.

Defaults to 60.

$https must come after the $source stage, and must come before the $emit or $merge stage. You can use $https in $hoppingWindow or $tumblingWindow internal pipelines.

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to ingestionTime.

  2. For each record from the Apache Kafka broker, the $https stage sends a request to an HTTPS weather source defined in the https_weather connection. The request uses the position.coordinates from the record in the HTTPS request to gather a seven-day high temperature forecast in degrees Celsius for that location, which it adds to the pipeline document in an airTemperatureForecast field.

  3. The $merge stage writes the output to an Atlas collection named stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

To view the documents in the resulting sample_weatherstream.stream collection, connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

Back

$validate