$https
On this page
Definition
The $https
stage specifies a connection in the
Connection Registry to send
HTTPS
requests to. Each time a prior stage passes a document to
$https
, the stage sends a new request.
Syntax
To send an HTTPS
request to a given connection:
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer> } } }
The $https
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
| string | Required | Label that identifies the connection in the
Connection Registry to
send an |
| string | expression | Optional | Path to append to the URL your For example, if you specify a If you define The API endpoint that you call should be idempotent. |
| document | Optional | Document containing key-value pairs to pass as parameters to your API endpoint call. Each key must be a string, and each value must evaluate to either a numeric, string, or boolean value. This field supports expressions as values. |
| string | Optional | HTTPS request method for your connection. Must be one of the following values:
Defaults to |
| document | Optional | Document containing key-value pairs to pass as headers to the API endpoint. Each key must be a string, and each value must evaluate to a string. This field supports expressions as values. If the API endpoint requires authentication, such as an API key or Bearer Access Token authentication, you should add authentication details as headers when you define the connection to prevent providing these as plaintext as part of this operator. Invalid HTTP header names and values are not sent to the API endpoint. Instead, these are ignored. To learn more about invalid HTTP headers, see RFC 9110. If an expression in a value fails or evaluates to a type other than string, the message is sent to the dead letter queue and the operator doesn't send this request to the API endpoint. |
| string | Required | Name of the field for the REST API response. If the endpoint returns 0 bytes, the operator doesn't set the The operator supports responses with a If the API endpoint returns a response without a defined |
| string | Optional | Behavior when the operator encounters an
The operator considers all
The operator considers any other HTTP status code as a
Defaults to |
| array | Optional | Custom inner pipeline that allows you to customize the request body sent to the API endpoint.
By default, the entire message is sent to the API endpoint. Atlas Stream Processing sends a relaxed mode JSON payload to the API endpoint. Invalid HTTP request bodies are not sent to the API endpoint. Instead, these are sent to the dead letter queue. To learn more about invalid HTTP request bodies, see RFC 9110. |
| document | Optional | Document containing fields that override various default values. |
| integer | Optional | The time, in seconds, after which a successful Defaults to |
| integer | Optional | The time, in seconds, after which an Defaults to |
Behavior
$https
must come after the $source
stage, and
must come before the $emit or $merge
stage. You can use $https
in $hoppingWindow
or
$tumblingWindow
internal pipelines.
Examples
A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:
The
$source
stage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata
, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it toingestionTime
.For each record from the Apache Kafka broker, the
$https
stage sends a request to an HTTPS weather source defined in thehttps_weather
connection. The request uses theposition.coordinates
from the record in the HTTPS request to gather a seven-day high temperature forecast in degrees Celsius for that location, which it adds to the pipeline document in anairTemperatureForecast
field.The
$merge
stage writes the output to an Atlas collection namedstream
in thesample_weatherstream
database. If no such database or collection exist, Atlas creates them.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
To view the documents in the resulting sample_weatherstream.stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.