架构基础知识
Atlas Stream Processing的核心抽象是流处理器。流处理器是一个MongoDB 聚合管道,它对来自指定源的流媒体数据持续运行,并将输出写入接收器。要学习;了解更多信息,请参阅流处理器的结构。
流处理发生在 Stream Processing 工作区上进行。每个Stream Processing 工作区都是一个关联以下内容的 Atlas 命名空间:
一个或多个流处理器,每个流处理器运行在自己分配的 RAM 和 CPU 上。
一个默认层级,确定在不指定层级时可提供给每个流处理器的内存量和计算量。
最大层级,决定了您可以分配给该 Stream Processing 工作区内 pod 的最大内存和计算量。
云提供商和云区域。
连接注册表,用于存储流媒体数据的可用源和接收器列表。
定义用户授权的安全上下文。
与 Stream Processing 工作区本身相连的连接字符串。
层级
定义流处理器后,它仅可用于定义它的 stream processing 工作区。每个流处理器都在根据其层级分配的资源上运行。Atlas Stream Processing仅在 Stream Processing 运行时向用户收取费用。
如果您启动流处理器而未声明层级大小,它将运行Stream Processing 工作区的默认层级。您可以启动任何层级的流处理器,直至包括Stream Processing 工作区的最大层级。
例子
您在名为 myWorkspace 的 Stream Processing 工作区、默认层级为 SP10、最大层级为 SP50 上定义了一个流处理器。如果您启动处理器时未指定层级,Atlas Stream Processing 会将其分配给 SP10 pod。然而,您可以声明从 SP2 到 SP50 的任何层级,Atlas Stream Processing 将处理器分配到一个适当大小的 pod。
连接注册表
连接注册表存储一个或多个连接。每个连接都会为网络和安全细节的组合分配一个名称,从而允许流处理器与外部服务进行交互。连接表现出以下行为:
只有在特定 Stream Processing 工作区的连接注册表中定义的连接,才能为该 Stream Processing 工作区上托管的流处理器提供服务。
每个连接可以为任意数量的流处理器提供服务
只有一个连接可以作为给定流处理器的源。
只有单个连接可以用作给定流处理器的接收器。
连接并非天生就被定义为源或接收器。任何给定的连接都可以提供任一功能,具体取决于流处理器如何调用该连接。
Atlas Stream Processing在专用的客户容器中运行每个流处理器,并在多租户基础架构上提供VM级隔离性。有关MongoDB安全性和合合规的更多信息,请参阅MongoDB信任中心。
检查点
Atlas Stream Processing使用检查点捕获流处理器的状态。每个检查点都有一个唯一的ID ,并受流处理器逻辑流的约束。在流处理器的所有操作符将其状态添加到检查点后, Atlas Stream Processing提交检查点,生成两种类型的记录:
一条提交记录,用于验证检查点ID及其所属的流处理器
一组记录,描述 Atlas Stream Processing 提交检查点时相关流处理器中每个有状态操作的状态。
当您在中断后重新启动流处理器时,Atlas Stream Processing 会查询最后提交的检查点并从所述状态恢复操作。
检查点记录源偏移量而不是源文档的副本。示例, Atlas Stream Processing存储变更流源的oplog恢复令牌,以及Kafka源的分区偏移量。在Atlas Stream Processing提交检查点后删除源文档或使源文档过期不会阻止处理器恢复。
如果重新启动处理器时, Atlas Stream Processing存储在上一个检查点的oplog恢复令牌不再位于源集群的oplog中,Atlas Stream Processing无法从该检查点恢复。要恢复,请使用resumeFromCheckpoint=false 重新启动处理器。为了降低这种风险,请为源集群配置足够的oplog window。要学习;了解如何设立oplog window,请参阅设置最小 Oplog 窗口。
对于启用了 initialSync 的变更流$source 配置, Atlas Stream Processing在初始同步阶段(复制源集合)和正常变更事件处理阶段设置检查点。在初始同步期间, Atlas Stream Processing会记录起始oplog位置并将源集合划分为多个分区。处理器通过分区复制现有文档,然后在复制完成后从初始记录位置重放变更事件。 Atlas Stream Processing在初始同步期间创建检查点,如果发生中断,处理器可以恢复。
Kafka源和消费者组偏移
After Atlas Stream Processing commits a 检查点, it 更新s the consumer group offsets in Kafka asynchronously.当您将Apache Kafka用作 $source 时,消费者组会将这些检查点作为Kafka集群中每个分区的已提交偏移量追踪。
由于这些更新是定期异步发生的,因此消费者群组的提交偏移量可能会暂时滞后于最新的检查点。对于从Kafka读取已提交偏移量的工具(例如 kafka-consumer-group CLI工具),这可能会导致监控和延迟指标出现短期延迟。这些工具可以显示流处理器“背后”的消费者群组的真实内部位置。
内部检查点和已提交的Kafka偏移量之间的延迟量不是固定的,可能会根据工作负载和配置而变化。在典型的工作负载中,该时间约为秒,但没有严格的上限。
死信队列(DLQ)
Atlas Stream Processing 支持使用 Atlas 数据库集合作为死信队列(DLQ)。当 Atlas Stream Processing 无法处理数据流中的文档时,它会将文档的内容和处理失败的详细信息写入 DLQ。您可以在流处理器定义中将集合指定为 DLQ。
要了解更多信息,请参阅创建流处理器。
流处理器状态
流处理器在其整个生命周期中在不同状态之间转换。了解这些状态有助于监控 Stream Processing 操作并对其进行故障排除。
可能的省/市/自治区包括:
州 | 说明 |
|---|---|
| 流处理器正在积极处理数据并正常运行。这是处理器使用来自源的数据、应用转换并写入接收器的理想操作状态。 |
| 流处理器已手动停止,处理数据。要学习;了解如何启动流处理器,请参阅启动流处理器。 |
| 流处理器进程分配资源或扩展基础架构。 常见原因包括:
潜在影响包括:
预配完成后,进程会自动恢复。不会发生数据丢失,因为Atlas Stream Processing通过检查点保留状态。 |
| 流处理器遇到错误,无法运行。这是由于用户错误或配置问题造成的。有关故障原因和恢复的详细信息,请参阅错误处理和重试策略。 |
注意
当系统分配资源时,流处理器可能会在监控仪表盘中间歇性地显示为“预配”状态。这是扩展操作期间的正常行为。如果处理器长时间(> 10 分钟)保持在 预配 状态,请检查您的 Stream Processing 工作区 资源 限制和 层级 配置。
错误处理和重试策略
Atlas Stream Processing实现全面的错误处理和重试策略,以确保可靠的流处理。系统区分不同类型的错误,并根据错误分类应用适当的重试策略。
错误分类
Atlas Stream Processing将错误分为两类:
用户错误
由于Atlas Stream Processing 无法控制的用户配置、数据问题或外部服务问题而导致的错误。示例包括:
连接凭证无效
外部服务的网络连接问题
无法处理的格式错误的数据
访问外部资源时出现权限问题
内部错误
Atlas Stream Processing系统本身出现的错误,通常是由于临时基础架构问题或服务中断造成的。这些问题被视为Atlas Stream Processing服务负责解决。
重试行为
重试行为因错误分类而异:
用户错误重试策略
Atlas Stream Processing在 5 分钟内尝试重新启动流处理器的次数有限
如果在此时间范围内所有重试均失败,流处理器将转换为
FAILED状态某些用户错误被归类为不可重试,并立即导致处理器故障。示例包括:
StreamProcessorWorkerOutOfMemory (418):管道超出层级内存限制StreamProcessorInvalidOptions (420):管道语法或配置无效
您可以通过调用
start()方法来手动重启失败的流处理器
内部错误重试策略
Atlas Stream Processing不断重试内部错误,没有时间限制
内部错误触发向Atlas Stream Processing工程团队发出的警报
解决内部问题后,系统依靠自动重试机制来恢复处理器
恢复进程
当流处理器遇到错误并需要重新启动时,恢复进程将按以下步骤进行:
此恢复进程可确保临时故障不会导致数据丢失,也不会在大多数情况下需要手动干预。
Atlas Stream Processing时序
在流式数据处理中,文档受两个计时系统的约束:
事件时间
处理时间
Atlas Stream Processing提供各种参数来控制流处理器与这些计时系统的交互。
事件时间
事件时间是指流生成文档或消息传递系统(例如Apache Kafka)接收文档。这是通过文档的时间戳来确定的。
网络延迟、上游处理和其他因素不仅会导致给定文档的这些时间出现差异,还会导致文档到达流处理器的时间顺序与事件时间顺序不一致。无论哪种情况,windows 都可能错过您打算让其捕获的文档。Atlas Stream Processing 会认为此类文档延迟到达,并将它们发送到死信队列(如果您配置了死信队列)。
处理时间
处理时间是流处理器使用文档的时间。这由托管流处理器的系统时钟确定。
boundary处理时间是滚动Windows和跳跃Windows支持的 字段的可配置选项。它允许您创建带有一种窗口的管道,该窗口根据服务器的挂钟时间累积数据。与事件时间窗口相反,处理时间窗口为每个事件在事件到达流处理器时根据服务器的挂钟时间为每个事件分配一个时间戳。
文档时间戳和窗口边界时间戳采用 UTC 时间。配置 窗口时不能指定 idleTimeout 或 allowedLateness processingTime选项。
例子
您创建一个具有 5 分钟事件时间窗口的管道。将一个事件添加到 09:33 处的源 Kafka 集群。由于 Kafka 集群存在一些延迟,它在 09:37 到达流处理器。
如果管道有一个 5 分钟的事件时间窗口,则此事件将被分配到 09:30-09:35 窗口。如果管道有一个 5 分钟的处理时间窗口,则事件将被分配到 09:35-09:40 窗口。
水印
水印会取代处理时间,并且仅当处理器消耗的文档的事件时间晚于任何先前消耗的文档时进行更新。 Atlas Stream Processing仅在管道包含窗口阶段($tumblingWindow 、$hoppingWindow 或$sessionWindow )时应用水印。
例子
您可以使用 5 分钟的Windows配置流处理器。 您在 12:00 启动处理器,因此前两个Windows的持续时间分别为 12:00-12:05 和 12:05-12:10。 下表说明了在不同延迟下(带水印和不带水印),哪些Windows将捕获哪些事件。
事件时间 | 处理时间 | 窗口时间(无水印) | 窗口时间(水印) |
|---|---|---|---|
12 : 00 | 12 : 00 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 03 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 02 | 12 : 05 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 06 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 06 | 12 : 07 | 12 : 05 - 12 : 10 | 12 : 05 - 12 : 10 |
在没有水印的情况下,根据 Stream Processing 工作区的系统时钟,12:00-12:05 窗口会在12:05 关闭,而12:05-12:10 窗口会立即打开。因此,尽管源在 12:00-12:05 期间生成了 4 个文档,但相关窗口仅捕获了 2 个文档。
使用水印时,12:00-12:05窗口不会在 12:05 关闭,因为在它截至该点摄取的文档中,最新事件时间(因此水印值)为 12:03。12:00-12:05窗口直到系统时钟的 12:07 才会关闭,此时流处理器接收事件时间为 12:05 的文档,将水印前进到该时间,并打开 12:05-12:10窗口。每个窗口都会捕获所有相应的文档。
从 Apache Kafka 读取时,Atlas Stream Processing 会等待所有分区通过水印。如果一个分区处于空闲状态,并且未能生成时间戳晚于水印的事件,窗口将不会关闭或输出结果。要解决此问题,请设置 partitionIdleTimeout,以确保空闲分区不会阻止水印进程。要了解更多信息,请参阅$source Stage(Stream Processing)。
允许迟到
如果事件时间和处理时间之间的差异足够大,则在水印已提前到足以关闭预期窗口后,文档可能会到达流处理器。 为了缓解这个问题,Atlas Stream Processing 支持“允许延迟”,该设置可将窗口关闭延迟相对于水印的设定时间间隔。
水印是流处理器的属性,而“允许延迟”是窗口的属性,并且仅在该窗口关闭时才会产生影响。如果流处理器的水印前进到会触发打开新窗口的点,则“允许延迟”会使较早的窗口保持打开状态,而不会阻止此操作。
例子
您可以使用 5 分钟的滚动Windows来配置流处理器。 您在 12:00 启动处理器,因此前两个Windows的持续时间分别为 12:00-12:05 和 12:05-12:10。 您将允许迟到时间设置为2分钟。
下表反映了流处理器摄取所述文件的顺序。
事件时间 | 水印 | 允许迟到时间 | 窗口时间 |
|---|---|---|---|
12 : 00 | 12 : 00 | 11 : 58 | 12 : 00 - 12 : 05 |
12 : 02 | 12 : 03 | 12 : 01 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 04 | 12 : 02 | 12 : 00 - 12 : 05 |
12 : 05 | 12 : 05 | 12 : 03 | 12:00-12:05, 12:05-12:10 |
12 : 04 | 12 : 06 | 12 : 04 | 12:00-12:05, 12:05-12:10 |
12 : 07 | 12 : 07 | 12 : 05 | 12 : 05 - 12 : 10 |
当水印前进到 12:05 时,12:05-12:10 窗口将打开。但是,由于“允许延迟”间隔为 2 分钟,在 12:00-12:05 窗口中,它实际上仅为 12:03,因此它保持打开状态。仅当水印前进到 12:07 时,调整后的时间才会达到 12:05 。此时,12:00-12:05 窗口将关闭。
空闲超时
默认情况下将窗口行为与处理时间分离可以在大多数情况下提高流处理的正确性。但是,流数据源可能会有长时间的空闲状态。在这种情况下,窗口可能会在空闲期之前捕获事件,并且在等待水印前进到足以关闭时无法返回处理后的结果。
Atlas Stream Processing允许用户为Windows配置空闲超时,以缓解这些使用处理时间的情况。 空闲超时是指当处理时间超过打开窗口时间间隔的终点并且流处理器的源处于空闲状态时开始的时间间隔。如果源保持空闲的时间间隔等于空闲超时时间,则窗口将关闭,并且水印独立于任何文档摄取而前进。
例子
您可以配置具有 3 分钟间隔和 1 分钟空闲超时的滚动窗口。下表说明了窗口间隔期间和之后空闲超时的影响。
处理时间 | 事件时间或状态 | 水印 | 窗口时间 |
|---|---|---|---|
12 : 00 | 12 : 00 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 01 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 02 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 03 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 04 | 12 : 02 | 12 : 02 | 12 : 00 - 12 : 03 |
12 : 05 | 12 : 05 | 12 : 05 | 12 : 03 - 12 : 06 |
12 : 06 | 源空闲 | 12 : 05 | 12 : 03 - 12 : 06 |
12 : 07 | 源空闲 | 12 : 00 | 12 : 06 - 12 : 09 |
12 : 08 | 源空闲 | 12 : 00 | 12 : 06 - 12 : 09 |
12 : 09 | 12 : 09 | 12 : 09 | 12 : 09 - 12 : 12 |
在 12:00-12:03 间隔期间,源空闲三分钟,但流处理器不会关闭窗口,因为处理时间未超过窗口间隔的结束时间,并且源在窗口间隔结束后不会保持空闲状态。当水印前进到 12:05 时,窗口将正常关闭, 12:03-12:06 窗口将打开。
当源在12:06进入空闲状态时,它会在12:07之前保持空闲状态,从而触发空闲超时并将水印提前到12:06 。