Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于您的流数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的 Stream Processing 工作区中使用给定的流处理器。
先决条件
要创建和管理流处理器,您必须具备:
具有
atlasAdmin角色的数据库用户,用于创建和运行流处理器Atlas 集群
Considerations
许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( - ) 或句点 ( . ),则必须将名称用方括号 ( [] ) 和双引号 ( "" ) 括在方法调用,如sp.["special-name-stream"].stats()中。
以交互方式创建流处理器
创建流处理器
启动流处理器
停止流处理器
修改流处理器
您可以修改现有流处理器的以下元素:
名称
要修改流处理器,请执行以下步骤:
默认情况下,修改后的处理器会从最后一个检查点恢复。或者,您可以设置 resumeFromCheckpoint=false,在这种情况下,处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时,这些窗口将在更新的管道上被完全重新计算。
注意
限制
启用默认设置 resumeFromCheckpoint=true 后,将应用以下限制:
您无法修改
$source阶段。您无法修改窗口的间隔时间。
您无法删除窗口。
只有当窗口的内部管道中包含
$group或$sort阶段时,您才能修改带有窗口的管道。您无法更改现有窗口类型。例如,您无法将
$tumblingWindow更改为$hoppingWindow,反之亦然。附带窗口的处理器可能会在重新计算这些窗口时重新处理某些数据。
修改操作后,不会保留每个操作符的统计信息。
删除流处理器
列出可用的流处理器
列出工作区默认值
来自流处理器的样本
要使用 mongosh 从现有流处理器返回采样结果大量给 STDOUT,请使用 sp.processor.sample() 方法。示例,以下命令从名为 proc01 的流处理器中进行采样。
sp.proc01.sample()
此命令将持续运行,直到您使用 CTRL-C 取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage 文档中报告示例中的无效文档:
{ _dlqMessage: { errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', workspaceName: '<workspaceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。