Atlas Stream Processing provides monitoring and alerts so that users can leverage performance and status insights to refine their workflows.
Monitor Stream Processor Workspaces in the Atlas UI
For each of your stream processing workspaces you can monitor your stream processors in the Atlas UI:
In Atlas, go to the Stream Processing page for your project.
If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.
If it's not already displayed, select your project from the Projects menu in the navigation bar.
In the sidebar, click Stream Processing under the Streaming Data heading.
The Stream Processing page displays.
Click the Monitoring tab.
The Monitoring tab displays a variety of runtime statistics about a stream processor of your choosing, including, but not limited to:
Number of messages ingested
Number of messages successfully processed
Number of messages sent to your Dead Letter Queue
If your source connection is Apache Kafka, you can monitor the lag between the current offset and the latest offset at the broker for a topic's partition and the sum of all the partition lags.
Stream Processor Monitoring Methods
Atlas Stream Processing provides the following methods for on-demand reporting about your stream processors:
The sp.processor.sample() method allows you to see a small sample of the documents output by a currently running stream processor of your choosing. Users can compare the sampled results against their expected results to diagnose any errors in their aggregation pipeline design.
The sp.processor.stats() method returns runtime statistics about a stream processor of your choosing.
The output document has the following fields:
Field | Type | Description |
|---|---|---|
| string | The namespace the stream processor is defined in. |
| object | A document describing the operational state of the stream processor. |
| string | The name of the stream processor. |
| string | The status of the stream processor. This field can have the following values:
|
| integer | The scale in which the size field displays. If set to |
| integer | The number of documents published to the stream. A document is considered 'published' to the stream once it passes through the |
| integer | The number of bytes or kilobytes published to the stream. Bytes are considered 'published' to the stream once they pass through the |
| integer | The number of documents processed by the stream. A document is considered 'processed' by the stream once it passes through the entire pipeline. |
| integer | The number of bytes or kilobytes processed by the stream. Bytes are considered 'processed' by the stream once they pass through the entire pipeline. |
| integer | The number of documents sent to the Dead Letter Queue. |
| integer | The number of bytes or kilobytes sent to the Dead Letter Queue. |
| integer | The difference, in seconds, between the event time represented by the most recent change stream resume token and the latest event in the oplog. |
| token | The most recent change stream resume token. Only applies to stream processors with a change stream source. |
| document | Latency statistics for the stream processor as a whole. Atlas Stream Processing returns this field only if you pass in the |
| integer | The estimated 50th percentile latency of all documents processed in the past 30 seconds. If your pipeline includes a window stage, latency measurements include the interval of the window. For example, if your |
| integer | The estimated 99th percentile latency of all documents processed in the past 30 seconds. If your pipeline includes a window stage, latency measurements include the interval of the window. For example, if your |
| datetime | Wall time at which the most recent 30 second measurement window began. |
| datetime | Wall time at which the most recent 30 second measurement window ended. |
| string | Unit of time in which latency is counted. This value is always |
| integer | Number of documents the stream processor has processed in the most recent 30 second measurement window. |
| integer | Sum of all individual latency measurements, in microseconds, taken in the most recent 30 second measurement window. |
| integer | The number of bytes used by windows to store processor state. |
| date | The timestamp of the current watermark. |
| array | The statistics for each operator in the processor pipeline. Atlas Stream Processing returns this field only if you pass in the
|
| integer | The maximum memory usage of the operator in bytes or kilobytes. |
| integer | The total execution time of the operator in milliseconds. |
| date | The start time of the minimum open window. This value is optional. |
| date | The start time of the maximum open window. This value is optional. |
| array | The per-target statistics for certain source and sink operators. Each element of this array is a document that represents a single input or output target, such as an input or output collection or a Apache Kafka topic. Depending on the operator, each document contains a subset of the following fields: For source operators, such as Apache Kafka Either
For sink operators, such as MongoDB Either
Atlas Stream Processing collects per-target statistics only for some stream aggregation stages, such as Apache Kafka It records per-target statistics for at most 100 distinct targets; after that, the stream processor stops adding new entries to |
| array | Offset information for an Apache Kafka broker's partitions. |
| integer | The Apache Kafka topic partition number. |
| integer | The offset that the stream processor is on for the specified partition. This value equals the previous offset that the stream processor processed plus |
| integer | The offset that the stream processor last committed to the Apache Kafka broker and the checkpoint for the specified partition. All messages through this offset are recorded in the last checkpoint. |
| boolean | The flag that indicates whether the partition is idle. This value defaults to |
Stream Processing Logs
Atlas Stream Processing provides two types of stream processing workspace activity logs:
- Operational logs, which primarily track the behavior of
- individual stream processors.
- Audit logs, which primarily track authentication and security
- activity at the stream processing workspace level.
To download Atlas Stream Processing operational or audit logs:
In Atlas, go to the Stream Processing page for your project.
If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.
If it's not already displayed, select your project from the Projects menu in the navigation bar.
In the sidebar, click Stream Processing under the Streaming Data heading.
The Stream Processing page displays.
Navigate to the pane of the stream processing workspace from which to download logs and click the ellipsis.
In the modal window, select the type of log you want to download.
In the Stream processor field, provide the name of the stream processor for which you want to download logs. Leave this field blank to download logs for all stream processors.
From the Time Period dropdown, select the interval for which you want to download logs.
Note
Logs take up to five minutes after creation to become queryable.
Third-Party Metrics Integrations
Atlas Stream Processing supports third-party metrics integrations, enabling users to record and analyze stream processor performance without having to develop custom logic or views.
Datadog tracks Atlas Stream Processing metrics with the prefix mongodb.atlas.stream_processing. To set up the integration and view available metrics, see Integrate with Datadog.
To send metrics to an OpenTelemetry (OTel) collector, see Integrate with OpenTelemetry (OTel).
Stream Processor Alerts
Atlas Stream Processing triggers alerts when processors change state, or a processor meets various ingestion or output thresholds. For a list of available Atlas Stream Processing alerts, see Atlas Stream Processing Alerts. To learn more about alert configuration, see Configure Alert Settings.
You can target Atlas Stream Processing alerts in the following ways:
All stream processors within a project
All stream processors within a stream processing workspace matching the configured predicate
All stream processors the names of which match the configured predicate
For targets other than all stream processors, you can configure multiple targets for the same alert.
Atlas Stream Processing configures a Strean Processor State is failed alert by default. As this is a project-level alert, it applies to any stream processor running in any stream processing workspace within the project for which it is configured.