Atlas Stream Processing 提供监控和警报功能,以便用户可以利用性能和状态见解来优化其工作流程。
在 Atlas 用户界面中监控流处理器工作区
对于您的每个 Stream Processing 工作区,您可以在 Atlas 用户界面中监控您的流处理器:
在Atlas中,转到项目的 Stream Processing 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击 Streaming Data 标题下的 Stream Processing。
此时将显示 Stream Processing 页面。
单击Monitoring 标签页。
Monitoring 选项卡显示有关所选流处理器的各种运行时统计信息,包括但不限于:
摄取的消息数量
成功处理的消息数量
发送到您的死信队列的消息数量
如果您的源连接是 Apache Kafka,您可以监控主题分区的当前偏移量与代理上的最新偏移量之间的延迟,以及所有分区延迟的总和。
流处理器监控方法
Atlas Stream Processing 提供以下方法来按需报告有关流处理器的信息:
sp.processor.sample() 方法允许您查看所选的当前正在运行的流处理器输出的一小部分文档示例。用户可以将示例结果与预期结果进行比较,以诊断聚合管道设计中的任何错误。
sp.processor.stats() 方法返回有关您选择的流处理器的运行时统计信息。
输出文档包含以下字段:
字段 | 类型 | 说明 |
|---|---|---|
| 字符串 | 定义流处理器的命名空间。 |
| 对象 | 描述流处理器操作状态的文档。 |
| 字符串 | 流处理器的名称。 |
| 字符串 | 流处理器的状态。 此字段可为以下值:
|
| 整型 | 大小字段的显示比例。如果设置为 |
| 整型 | 发布到流的文档数量。 文档在通过 |
| 整型 | 发布到流的字节数或千字节数。 字节在经过 |
| 整型 | 该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。 |
| 整型 | 流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。 |
| 整型 | |
| 整型 | |
| 整型 | |
| token | 最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。 |
| 文档 | 流处理器整体的延迟统计信息。仅当您传入 |
| 整型 | 过去 30 秒内处理的所有文档的第 50 个百分位数延迟的估计值。如果您的管道包含窗口阶段,则延迟测量包括窗口的时间间隔。 示例,如果您的 |
| 整型 | 过去 30 秒内处理的所有文档的第 99 个百分位数延迟的估计值。如果您的管道包含窗口阶段,则延迟测量包括窗口的时间间隔。 示例,如果您的 |
| datetime | 最近 30 秒测量窗口开始的挂钟时间。 |
| datetime | 最近 30 秒测量窗口结束时的挂钟时间。 |
| 字符串 | 延迟计算所用的时间单位。此值始终为 |
| 整型 | 流处理器在最近 30 秒测量窗口内处理的文档数量。 |
| 整型 | 在最近的 30 秒测量窗口中进行的所有单独延迟测量值的总和(以微秒为单位)。 |
| 整型 | Windows 用于存储处理器状态的字节数。 |
| 日期 | 当前水印的时间戳。 |
| 阵列 | 处理器管道中每个操作符的统计信息。 仅当您传入
|
| 整型 | 操作符的最大内存使用量(以字节或千字节为单位)。 |
| 整型 | 操作符的总执行时间(以毫秒为单位)。 |
| 日期 | 最小打开窗口的开始时间。此值是可选的。 |
| 日期 | 最大开放窗口的开始时间。此值是可选的。 |
| 阵列 | 某些源和汇操作符的每个目标的统计信息。 该数组的每个元素都是一份文档,表示单个输入或输出目标,例如输入或输出集合或Apache Kafka主题。根据操作符,每个文档都包含以下字段的子集: 对于源操作符,如Apache Kafka 对于 MongoDB 目标为
对于 sink 操作符,例如 MongoDB 对于 MongoDB 目标为
Atlas Stream Processing 仅针对某些流聚合阶段(例如 Apache Kafka 其记录最多 100 个不同目标的每个目标统计信息;以后,流处理器停止向 |
| 阵列 | Apache Kafka代理分区的偏移信息。 |
| 整型 | Apache Kafka主题分区号。 |
| 整型 | 流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上 |
| 整型 | 流处理器上次提交给Apache Kafka代理的偏移量以及指定分区的检查点。通过此偏移量的所有消息都记录在最后检查点中。 |
| 布尔 | 用于指示该分区是否空闲的标志。此值默认为 |
Stream Processing 日志
Atlas Stream Processing提供两种类型的流处理工作区活动日志:
- 操作日志,主要追踪
- 单个流处理器。
- Atlas 审核日志,主要追踪身份验证和安全
- stream processing 工作区级别的活动。
要下载Atlas Stream Processing操作或Atlas 审核日志,请执行以下操作:
在Atlas中,转到项目的 Stream Processing 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击 Streaming Data 标题下的 Stream Processing。
此时将显示 Stream Processing 页面。
导航到要下载日志的流处理工作区的窗格,然后单击省略号。
在模式窗口中,选择要下载的日志类型。
在 Stream processor字段中,提供要下载日志的流处理器的名称。将此字段留空可下载所有流处理器的日志。
从 Time Period 下拉列表中,选择要下载日志的时间间隔。
注意
日志在创建后最多需要五分钟才能查询。
第三方指标集成
Atlas Stream Processing支持第三方指标集成,使用户能够记录和分析流处理器性能,而无需开发自定义逻辑或视图。
Datadog 跟踪前缀为 的Atlas Stream Processing指标。要设立集成并查看可用指标,请参阅与mongodb.atlas.stream_processing Datadog 集成。
要将指标发送到 OpenTelemetry (OTel) 收集器,请参阅与 OpenTelemetry (OTel) 集成。
流处理器警报
当处理器状态发生变化或满足各种输入或输出阈值时,Atlas Stream Processing 会触发警报。有关可用的 Atlas Stream Processing 警报列表,请参阅 Atlas Stream Processing 警报。要了解有关警报配置的更多信息,请参阅配置警报设置。
您可以通过以下方式处理 Atlas Stream Processing 警报:
项目中的所有流处理器
所有 Stream Processing 工作区中与配置的谓词匹配的流处理器
所有名称符合配置谓词的流处理器
对于除所有流处理器之外的目标,您可以为同一警报配置多个目标。
Atlas Stream Processing默认配置Strean Processor State is failed警报。由于这是项目级警报,因此它适用于在为其配置警报的项目内任何 stream processing 工作区中运行的任何流处理器。