对于AI助手:文档索引位于 https://www.mongodb.com/zh-cn/docs/llms.txt — 通过将 .md 附加到任何URL路径,可以获得所有页面的降价版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

Atlas Stream Processing监控和警报

Atlas Stream Processing 提供监控和警报功能,以便用户可以利用性能和状态见解来优化其工作流程。

对于您的每个 Stream Processing 工作区,您可以在 Atlas 用户界面中监控您的流处理器:

1
  1. 如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。

  2. 如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。

  3. 在侧边栏中,单击 Streaming Data 标题下的 Stream Processing

此时将显示 Stream Processing 页面。

2

在要监控的 Stream Processing 工作区窗格中,点击 Manage

3

Monitoring 选项卡显示有关所选流处理器的各种运行时统计信息,包括但不限于:

  • 摄取的消息数量

  • 成功处理的消息数量

  • 发送到您的死信队列的消息数量

如果您的源连接是 Apache Kafka,您可以监控主题分区的当前偏移量与代理上的最新偏移量之间的延迟,以及所有分区延迟的总和。

4

您可以通过流处理器名称、时间范围和粒度来过滤图表。

Atlas Stream Processing 提供以下方法来按需报告有关流处理器的信息:

sp.processor.sample() 方法允许您查看所选的当前正在运行的流处理器输出的一小部分文档示例。用户可以将示例结果与预期结果进行比较,以诊断聚合管道设计中的任何错误。

sp.processor.stats() 方法返回有关您选择的流处理器的运行时统计信息。

输出文档包含以下字段:

字段
类型
说明

ns

字符串

定义流处理器的命名空间。

stats

对象

描述流处理器操作状态的文档。

stats.name

字符串

流处理器的名称。

stats.status

字符串

流处理器的状态。 此字段可为以下值:

  • created

  • running

  • error

  • stopping

stats.scaleFactor

整型

大小字段的显示比例。如果设置为1 ,大小以字节为单位显示。 如果设置为1024 ,则大小以千字节为单位显示。

stats.inputMessageCount

整型

发布到流的文档数量。 文档在通过$source阶段(而不是通过整个管道时)才被视为已“发布”到流。

stats.inputMessageSize

整型

发布到流的字节数或千字节数。 字节在经过$source阶段(而不是经过整个管道时)被视为已“发布”到流。

stats.outputMessageCount

整型

该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。

stats.outputMessageSize

整型

流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。

stats.dlqMessageCount

整型

stats.dlqMessageSize

整型

stats.changeStreamTimeDifferenceSecs

整型

最新变更流恢复令牌所代表的事件时间与 oplog 中最新事件之间的时间差(以秒为单位)。

stats.changeStreamState

token

最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。

stats.latency

文档

流处理器整体的延迟统计信息。仅当您传入 verbose 选项时,Atlas Stream Processing 才会返回此字段。

stats.latency.p50

整型

过去 30 秒内处理的所有文档的第 50 个百分位数延迟的估计值。如果您的管道包含窗口阶段,则延迟测量包括窗口的时间间隔。

示例,如果您的 $tumblingWindow 阶段的间隔为 5 分钟,则延迟测量值包括这 5 分钟。

stats.latency.p99

整型

过去 30 秒内处理的所有文档的第 99 个百分位数延迟的估计值。如果您的管道包含窗口阶段,则延迟测量包括窗口的时间间隔。

示例,如果您的 $tumblingWindow 阶段的间隔为 5 分钟,则延迟测量值包括这 5 分钟。

stats.latency.start

datetime

最近 30 秒测量窗口开始的挂钟时间。

stats.latency.end

datetime

最近 30 秒测量窗口结束时的挂钟时间。

stats.latency.unit

字符串

延迟计算所用的时间单位。此值始终为 microseconds

stats.latency.count

整型

流处理器在最近 30 秒测量窗口内处理的文档数量。

stats.latency.sum

整型

在最近的 30 秒测量窗口中进行的所有单独延迟测量值的总和(以微秒为单位)。

stats.stateSize

整型

Windows 用于存储处理器状态的字节数。

stats.watermark

日期

当前水印的时间戳。

stats.operatorStats

阵列

处理器管道中每个操作符的统计信息。 仅当您传入verbose选项时,Atlas Stream Processing 才会返回此字段。

stats.operatorStats 提供许多核心stats字段的每个操作符版本:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats 包括以下独特字段:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats 当您传入 verbose 选项并且您的处理器包含窗口阶段时,还包含以下字段:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats 对于某些源操作符和目标操作符,还可以包含以下字段:

  • stats.operatorStats.targetStats

stats.operatorStats.maxMemoryUsage

整型

操作符的最大内存使用量(以字节或千字节为单位)。

stats.operatorStats.executionTimeMillis

整型

操作符的总执行时间(以毫秒为单位)。

stats.minOpenWindowStartTime

日期

最小打开窗口的开始时间。此值是可选的。

stats.maxOpenWindowStartTime

日期

最大开放窗口的开始时间。此值是可选的。

stats.operatorStats.targetStats

阵列

某些源和汇操作符的每个目标的统计信息。

该数组的每个元素都是一份文档,表示单个输入或输出目标,例如输入或输出集合或Apache Kafka主题。根据操作符,每个文档都包含以下字段的子集:

对于源操作符,如Apache Kafka$source 或变更流源:

对于 MongoDB 目标为 dbcoll,对于 Apache Kafka 目标为 topic

  • inputMessageCount

  • inputMessageSize

对于 sink 操作符,例如 MongoDB $merge$emit sinks:

对于 MongoDB 目标为 dbcoll,对于 Apache Kafka 目标为 topic

  • outputMessageCount

  • outputMessageSize

Atlas Stream Processing 仅针对某些流聚合阶段(例如 Apache Kafka $source 和 MongoDB $merge)收集每个目标的统计信息。

其记录最多 100 个不同目标的每个目标统计信息;以后,流处理器停止向 targetStats 添加新条目,但继续更新每个操作符的总体统计信息。

stats.kafkaPartitions

阵列

Apache Kafka代理分区的偏移信息。kafkaPartitions 仅适用于使用Apache Kafka源的连接。

stats.kafkaPartitions.partition

整型

Apache Kafka主题分区号。

stats.kafkaPartitions.currentOffset

整型

流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上1

stats.kafkaPartitions.checkpointOffset

整型

流处理器上次提交给Apache Kafka代理的偏移量以及指定分区的检查点。通过此偏移量的所有消息都记录在最后检查点中。

stats.kafkaPartitions.isIdle

布尔

用于指示该分区是否空闲的标志。此值默认为 false

Atlas Stream Processing提供两种类型的流处理工作区活动日志:

  • 操作日志,主要追踪
    单个流处理器。
  • Atlas 审核日志,主要追踪身份验证和安全
    stream processing 工作区级别的活动。

要下载Atlas Stream Processing操作或Atlas 审核日志,请执行以下操作:

1
  1. 如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。

  2. 如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。

  3. 在侧边栏中,单击 Streaming Data 标题下的 Stream Processing

此时将显示 Stream Processing 页面。

2

导航到要下载日志的流处理工作区的窗格,然后单击省略号。

3
4

在模式窗口中,选择要下载的日志类型。

5

Stream processor字段中,提供要下载日志的流处理器的名称。将此字段留空可下载所有流处理器的日志。

6

Time Period 下拉列表中,选择要下载日志的时间间隔。

7

注意

日志在创建后最多需要五分钟才能查询。

Atlas Stream Processing支持第三方指标集成,使用户能够记录和分析流处理器性能,而无需开发自定义逻辑或视图。

Datadog 跟踪前缀为 的Atlas Stream Processing指标。要设立集成并查看可用指标,请参阅与mongodb.atlas.stream_processing Datadog 集成。

要将指标发送到 OpenTelemetry (OTel) 收集器,请参阅与 OpenTelemetry (OTel) 集成。

当处理器状态发生变化或满足各种输入或输出阈值时,Atlas Stream Processing 会触发警报。有关可用的 Atlas Stream Processing 警报列表,请参阅 Atlas Stream Processing 警报。要了解有关警报配置的更多信息,请参阅配置警报设置。

您可以通过以下方式处理 Atlas Stream Processing 警报:

  • 项目中的所有流处理器

  • 所有 Stream Processing 工作区中与配置的谓词匹配的流处理器

  • 所有名称符合配置谓词的流处理器

对于除所有流处理器之外的目标,您可以为同一警报配置多个目标。

Atlas Stream Processing默认配置Strean Processor State is failed警报。由于这是项目级警报,因此它适用于在为其配置警报的项目内任何 stream processing 工作区中运行的任何流处理器。