对于AI助手:文档索引位于 https://www.mongodb.com/zh-cn/docs/llms.txt — 通过将 .md 附加到任何URL路径,可以获得所有页面的降价版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

开发流处理器

Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于您的流数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的 Stream Processing 工作区中使用给定的流处理器。

要创建和管理流处理器,您必须具备:

许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( - ) 或句点 ( . ),则必须将名称用方括号 ( [] ) 和双引号 ( "" ) 括在方法调用,如sp.["special-name-stream"].stats()中。

注意

Atlas Stream Processing会丢弃处于 状态且持续stopped 45天或更长时间的流处理器的内部状态。当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

注意

Atlas Stream Processing会丢弃处于 状态且持续stopped 45天或更长时间的流处理器的内部状态。当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

您可以修改现有流处理器的以下元素:

要修改流处理器,请执行以下步骤:

1

请参阅停止流处理器。

2

请参阅。

3

请参阅启动流处理器。

默认情况下,修改后的处理器会从最后一个检查点恢复。或者,您可以设置 resumeFromCheckpoint=false,在这种情况下,处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时,这些窗口将在更新的管道上被完全重新计算。

注意

如果您通过使用包含 iscontains 等匹配器表达式的 Operator 更改了已为其配置流处理器状态为失败警报的流处理器的名称,则当匹配器表达式与新名称不匹配时,Atlas 不会为重命名后的流处理器触发警报。要监控重命名后的流处理器,请重新配置该警报。

启用默认设置 resumeFromCheckpoint=true 后,将应用以下限制:

  • 您无法修改 $source 阶段。

  • 您无法修改窗口的间隔时间。

  • 您无法删除窗口。

  • 只有当窗口的内部管道中包含 $group$sort 阶段时,您才能修改带有窗口的管道。

  • 您无法更改现有窗口类型。例如,您无法将 $tumblingWindow 更改为 $hoppingWindow,反之亦然。

  • 附带窗口的处理器可能会在重新计算这些窗口时重新处理某些数据。

  • 修改操作后,不会保留每个操作符的统计信息。

要使用 mongosh 从现有流处理器返回采样结果大量给 STDOUT,请使用 sp.processor.sample() 方法。示例,以下命令从名为 proc01 的流处理器中进行采样。

sp.proc01.sample()

此命令将持续运行,直到您使用 CTRL-C 取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage 文档中报告示例中的无效文档:

{
_dlqMessage: {
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
workspaceName: '<workspaceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。

注意

Atlas Stream Processing会丢弃处于 状态且持续stopped 45天或更长时间的流处理器的内部状态。当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。