For AI agents: a documentation index is available at https://www.mongodb.com/docs/llms.txt — markdown versions of all pages are available by appending .md to any URL path.
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

Atlas Stream Processing Monitoring and Alerting

Atlas Stream Processing provides monitoring and alerts so that users can leverage performance and status insights to refine their workflows.

For each of your stream processing workspaces you can monitor your stream processors in the Atlas UI:

1
  1. If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.

  2. If it's not already displayed, select your project from the Projects menu in the navigation bar.

  3. In the sidebar, click Stream Processing under the Streaming Data heading.

The Stream Processing page displays.

2

In the pane of the stream processing workspace you want to monitor, click Manage.

3

The Monitoring tab displays a variety of runtime statistics about a stream processor of your choosing, including, but not limited to:

  • Number of messages ingested

  • Number of messages successfully processed

  • Number of messages sent to your Dead Letter Queue

If your source connection is Apache Kafka, you can monitor the lag between the current offset and the latest offset at the broker for a topic's partition and the sum of all the partition lags.

4

You can filter the charts by stream processor name, time range, and granularity.

Atlas Stream Processing provides the following methods for on-demand reporting about your stream processors:

The sp.processor.sample() method allows you to see a small sample of the documents output by a currently running stream processor of your choosing. Users can compare the sampled results against their expected results to diagnose any errors in their aggregation pipeline design.

The sp.processor.stats() method returns runtime statistics about a stream processor of your choosing.

The output document has the following fields:

Field
Type
Description

ns

string

The namespace the stream processor is defined in.

stats

object

A document describing the operational state of the stream processor.

stats.name

string

The name of the stream processor.

stats.status

string

The status of the stream processor. This field can have the following values:

  • created

  • running

  • error

  • stopping

stats.scaleFactor

integer

The scale in which the size field displays. If set to 1, sizes display in bytes. If set to 1024, sizes display in kilobytes.

stats.inputMessageCount

integer

The number of documents published to the stream. A document is considered 'published' to the stream once it passes through the $source stage, not when it passes through the entire pipeline.

stats.inputMessageSize

integer

The number of bytes or kilobytes published to the stream. Bytes are considered 'published' to the stream once they pass through the $source stage, not when it passes through the entire pipeline.

stats.outputMessageCount

integer

The number of documents processed by the stream. A document is considered 'processed' by the stream once it passes through the entire pipeline.

stats.outputMessageSize

integer

The number of bytes or kilobytes processed by the stream. Bytes are considered 'processed' by the stream once they pass through the entire pipeline.

stats.dlqMessageCount

integer

The number of documents sent to the Dead Letter Queue.

stats.dlqMessageSize

integer

The number of bytes or kilobytes sent to the Dead Letter Queue.

stats.changeStreamTimeDifferenceSecs

integer

The difference, in seconds, between the event time represented by the most recent change stream resume token and the latest event in the oplog.

stats.changeStreamState

token

The most recent change stream resume token. Only applies to stream processors with a change stream source.

stats.latency

document

Latency statistics for the stream processor as a whole. Atlas Stream Processing returns this field only if you pass in the verbose option.

stats.latency.p50

integer

The estimated 50th percentile latency of all documents processed in the past 30 seconds. If your pipeline includes a window stage, latency measurements include the interval of the window.

For example, if your $tumblingWindow stage has an interval of 5 minutes, latency measurements include those 5 minutes.

stats.latency.p99

integer

The estimated 99th percentile latency of all documents processed in the past 30 seconds. If your pipeline includes a window stage, latency measurements include the interval of the window.

For example, if your $tumblingWindow stage has an interval of 5 minutes, latency measurements include those 5 minutes.

stats.latency.start

datetime

Wall time at which the most recent 30 second measurement window began.

stats.latency.end

datetime

Wall time at which the most recent 30 second measurement window ended.

stats.latency.unit

string

Unit of time in which latency is counted. This value is always microseconds.

stats.latency.count

integer

Number of documents the stream processor has processed in the most recent 30 second measurement window.

stats.latency.sum

integer

Sum of all individual latency measurements, in microseconds, taken in the most recent 30 second measurement window.

stats.stateSize

integer

The number of bytes used by windows to store processor state.

stats.watermark

date

The timestamp of the current watermark.

stats.operatorStats

array

The statistics for each operator in the processor pipeline. Atlas Stream Processing returns this field only if you pass in the verbose option.

stats.operatorStats provides per-operator versions of many core stats fields:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats includes the following unique fields:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats also includes the following fields when you pass in the verbose option and your processor includes a window stage:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats can also include the following field for certain source and sink operators:

  • stats.operatorStats.targetStats

stats.operatorStats.maxMemoryUsage

integer

The maximum memory usage of the operator in bytes or kilobytes.

stats.operatorStats.executionTimeMillis

integer

The total execution time of the operator in milliseconds.

stats.minOpenWindowStartTime

date

The start time of the minimum open window. This value is optional.

stats.maxOpenWindowStartTime

date

The start time of the maximum open window. This value is optional.

stats.operatorStats.targetStats

array

The per-target statistics for certain source and sink operators.

Each element of this array is a document that represents a single input or output target, such as an input or output collection or a Apache Kafka topic. Depending on the operator, each document contains a subset of the following fields:

For source operators, such as Apache Kafka $source or change stream sources:

Either db and coll for MongoDB targets, or topic for Apache Kafka targets.

  • inputMessageCount

  • inputMessageSize

For sink operators, such as MongoDB $merge or $emit sinks:

Either db and coll for MongoDB targets, or topic for Apache Kafka targets.

  • outputMessageCount

  • outputMessageSize

Atlas Stream Processing collects per-target statistics only for some stream aggregation stages, such as Apache Kafka $source and MongoDB $merge.

It records per-target statistics for at most 100 distinct targets; after that, the stream processor stops adding new entries to targetStats but continues updating aggregate per-operator statistics.

stats.kafkaPartitions

array

Offset information for an Apache Kafka broker's partitions. kafkaPartitions applies only to connections using an Apache Kafka source.

stats.kafkaPartitions.partition

integer

The Apache Kafka topic partition number.

stats.kafkaPartitions.currentOffset

integer

The offset that the stream processor is on for the specified partition. This value equals the previous offset that the stream processor processed plus 1.

stats.kafkaPartitions.checkpointOffset

integer

The offset that the stream processor last committed to the Apache Kafka broker and the checkpoint for the specified partition. All messages through this offset are recorded in the last checkpoint.

stats.kafkaPartitions.isIdle

boolean

The flag that indicates whether the partition is idle. This value defaults to false.

Atlas Stream Processing provides two types of stream processing workspace activity logs:

  • Operational logs, which primarily track the behavior of
    individual stream processors.
  • Audit logs, which primarily track authentication and security
    activity at the stream processing workspace level.

To download Atlas Stream Processing operational or audit logs:

1
  1. If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.

  2. If it's not already displayed, select your project from the Projects menu in the navigation bar.

  3. In the sidebar, click Stream Processing under the Streaming Data heading.

The Stream Processing page displays.

2

Navigate to the pane of the stream processing workspace from which to download logs and click the ellipsis.

3
4

In the modal window, select the type of log you want to download.

5

In the Stream processor field, provide the name of the stream processor for which you want to download logs. Leave this field blank to download logs for all stream processors.

6

From the Time Period dropdown, select the interval for which you want to download logs.

7

Note

Logs take up to five minutes after creation to become queryable.

Atlas Stream Processing supports third-party metrics integrations, enabling users to record and analyze stream processor performance without having to develop custom logic or views.

Datadog tracks Atlas Stream Processing metrics with the prefix mongodb.atlas.stream_processing. To set up the integration and view available metrics, see Integrate with Datadog.

To send metrics to an OpenTelemetry (OTel) collector, see Integrate with OpenTelemetry (OTel).

Atlas Stream Processing triggers alerts when processors change state, or a processor meets various ingestion or output thresholds. For a list of available Atlas Stream Processing alerts, see Atlas Stream Processing Alerts. To learn more about alert configuration, see Configure Alert Settings.

You can target Atlas Stream Processing alerts in the following ways:

  • All stream processors within a project

  • All stream processors within a stream processing workspace matching the configured predicate

  • All stream processors the names of which match the configured predicate

For targets other than all stream processors, you can configure multiple targets for the same alert.

Atlas Stream Processing configures a Strean Processor State is failed alert by default. As this is a project-level alert, it applies to any stream processor running in any stream processing workspace within the project for which it is configured.