行业:媒体、电信
产品和工具: MongoDB Atlas、MongoDB Vector Search、MongoDB Atlas Stream Processing
合作伙伴: Voyage AI、Azure OpenAI
解决方案概述
当新用户访问您的新闻网站时,您只有几秒钟的时间来理解他们正在寻找什么,否则他们可能会失去兴趣并离开。这就引入了冷启动问题:在没有任何用户历史数据的情况下,您如何向用户推荐内容?
假设一位匿名用户登录您的站点并点击了三篇文章:
“NVIDIA 创造新的 AI 芯片”
“台积电扩大在亚利桑那州的生产”
“英特尔股价因延迟下跌”
简单的关键词搜索只会推荐其他关于英特尔或亚利桑那州的文章。然而,智能系统识别出这三次点击都与半导体供应链有关。这样即使这些文章与用户的浏览历史没有共同的关键词,它也能推荐类似《硅谷的未来》这样的相关文章。
本文中的解决方案构建了一个智能媒体个性化系统,该系统接收并处理点击流数据,以生成用户兴趣的自然语言摘要,并推荐用户更有可能感兴趣的相关文章。
该解决方案结合了:
Atlas Stream Processing 用于实时数据接收、丰富以及 LLM 集成,从点击流数据中总结用户意图。
Atlas Vector Search 带有自动化 Voyage AI 嵌入,用于语义检索和推荐
参考架构
此架构展示了如何构建一个 AI 驱动的媒体推荐引擎,该引擎使用 MongoDB Atlas 实时摄取、处理和响应用户行为。

图 1. 采用 Atlas Stream Processing 和 MongoDB Vector Search 的 AI 驱动型媒体个性化架构
该架构分三个阶段运行,以下章节将详细描述:
接收与丰富:使用 Atlas Stream Processing 从面向用户的应用程序捕获原始点击流事件,并将其与文章元数据实时关联。
会话化和汇总:将相关点击分组到会话中,并使用 LLM 生成用户兴趣的自然语言摘要。
搜索与服务:使用生成的摘要驱动语义向量搜索,并返回个性化推荐。
阶段 1:接收并丰富点击流数据
在第一阶段,该解决方案架构使用 Atlas Stream Processing 从您的媒体平台中接收原始点击流数据,并通过数据库中文章的元数据来丰富这些数据。
设置您的数据源。
此解决方案使用同一个 MongoDB 集群的 news 数据库中的以下两个集合:
articles 集合包含目录中文章的元数据。在此示例中,集合位于 ClickstreamCluster 集群的 news 数据库中。
集合中的每个文档都代表一篇文章并包含相关的元数据。例如:
{ "_id": { "$oid": "696493bfbc1084032ac0adfe" }, "title": "Ukraine updates, Day 6: ‘We are sacrificing our lives for freedom,’ Zelenskyy gets standing ovation after speech to European parliament", "link": "https://nationalpost.com/news/world/ukraine-updates-day-6-russia-kyiv", "keywords": null, "creator": [ "National Post Wire Services" ], "video_url": null, "description": "Russia escalated shelling overnight of key cities in Ukraine as its troops on the ground move slowly in a large convoy toward the capital, Kyiv", "content": "8:20 a.m. EST — Ukraine's Zelenskyy tells EU: 'Prove that you are with us\" Read More", "pubDate": "2022-03-01 13:45:04", "expire_at": "Wed, 07 Sep 2022 13:45:04 GMT", "image_url": null, "source_id": "nationalpost", "country": [ "canada" ], "category": [ "top" ], "language": "english" }
user_events 集合包含从面向用户的应用接收的原始点击流事件。在此示例中,集合位于 ClickstreamCluster 集群的 news 数据库中。
您可以使用首选的事件收集系统设置点击流数据源。要重现参考架构图中所示的方法,请在面向用户的应用中实现一个事件收集 API,将点击事件发送到 Apache Kafka 主题。然后,使用 MongoDB Kafka Sink Connector 从点击流主题中读取数据,并写入 MongoDB 集群的 user_events 集合。
每次点击文章都会生成一个事件文档,其中包含 session_id、article_id 和 timestamp 等字段。例如:
{ "_id": { "$oid": "696a1ecd66a51be18fffb8fa" }, "user_id": "user-2", "session_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c", "timestamp": { "$date": "2026-01-16T16:49:41.208Z" }, "event_type": "read", "article_id": { "$oid": "696493e6bc1084032ac116ed" }, "device": "desktop", "metadata": { "time_on_page": 54, "referral": "https://guzman.com/main/search/listmain.jsp" } }
创建 Stream Processing 工作区。
在Atlas中, Go项目的 Stream Processing 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击 Streaming Data 标题下的 Stream Processing。
此时将显示 Stream Processing 页面。
单击 Create a workspace(连接)。
在 Create a stream processing workspace 页面上,按如下方式配置您的工作区:
Tier:
SP30Provider:
AWSRegion:
us-east-1Workspace Name:
article-personalization
添加点击流和文章数据的连接。
流处理器需要同时连接您的点击流数据和文章元数据,才能执行丰富操作。两个数据源应位于同一个 Atlas 集群中。
在 Stream Processing 工作区的窗格中,点击Manage。
在Connection Registry标签页中,单击右上角的+ Add Connection 。
在 Edit Connection 页面上,按如下方式配置您的连接:
连接类型:
Atlas Database连接名称:
usereventsAtlas Cluster:您的点击流和文章数据所在的集群名称(例如
ClickstreamCluster)执行角色:
Read and write to any database
单击 Save changes 以创建连接。
创建持久流处理器。
创建一个名为 userIntentSummarizer 的流处理器,其中包含从 user_events 集合读取原始点击流事件并使用文章元数据丰富这些事件的阶段。
在 Atlas 项目的 Stream Processing 页面上,单击流处理工作空间窗格中的 Manage。
在JSON editor 中,将以下 JSON定义复制并粘贴到JSON编辑器文本框中,以定义具有以下阶段的名为
userIntentSummarizer的流处理器:$source:从所连接的点击流集群 (userevents连接) 的news数据库中的user_events集合读取原始点击流事件。$lookup: 基于article_id字段将原始点击流事件与articles集合进行关联,从description、keywords和title字段引入相关的文章元数据。$addFields:将description、keywords和title字段从article_details字段投影到事件流的顶层,以便在下游阶段轻松访问。$project:投影相关字段以供下游处理。
{ "$source": { "connectionName": "userevents", "db": "news", "collection": "user_events" } }, { "$lookup": { "from": { "connectionName": "userevents", "db": "news", "coll": "articles" }, "localField": "article_id", "foreignField": "_id", "as": "article_details", "pipeline": [ { "$project": { "_id": 0, "description": 1, "keywords": 1, "title": 1 } } ] } }, { "$addFields": { "description": { "$arrayElemAt": [ "$article_details.description", 0 ] }, "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] }, "title": { "$arrayElemAt": [ "$article_details.title", 0 ] } } }, { "$project": { "userId": 1, "article_id": 1, "eventTime": 1, "event_type": 1, "device": 1, "session_id": 1, "description": 1, "keywords": 1, "title": 1 } }, 单击 Update stream processor 保存更改。
阶段 2:会话化并汇总用户行为
在此阶段中,我们扩展流处理器管道,将相关点击分组到会话中,并使用 LLM 为每个会话生成每个会话的自然语言摘要,描述用户的兴趣。
连接 LLM 提供商到流处理工作区。
添加与您的 LLM 提供商(例如 Azure OpenAI)之间的外部 HTTPS 连接,以便流处理器能够直接从管道调用 LLM 来丰富数据:
在 Stream Processing 工作区的窗格中,点击Manage。
在Connection Registry标签页中,单击右上角的+ Add Connection 。
请按如下方式配置连接:
连接类型:
HTTPS连接名称:
azureopenaiURL:您的 Azure OpenAI 实例的终结点 URL。
标头:将这些键值对添加到标头:
键:
Content-Type,值:application/json键:
api-key,值:您的 Azure OpenAI API 密钥
使用 $sessionWindow 阶段对点击流数据进行会话化。
向流处理器管道添加 $sessionWindow 阶段,根据指定的会话间隔将相关事件分组到会话中。此解决方案将会话定义为来自同一 session_id 的一系列事件,且不活动间隔不超过 60 秒。
在丰富阶段之后,将此阶段添加到您的 userIntentSummarizer 管道中:
{ "$sessionWindow": { "partitionBy": "$session_id", "gap": { "unit": "second", "size": 60 }, "pipeline": [{ "$group": { "_id": "$session_id", "titles": { "$push": "$title" } } }] } }
使用 $https 阶段总结用户会话。
添加一个 $https 阶段,直接从流处理管道调用您的 LLM 提供商。此解决方案调用 Azure OpenAI,为每个会话生成自然语言摘要,其中基于会话中的文章标题描述了用户的兴趣。
在 $sessionWindow 阶段之后,将此阶段添加到您的管道中:
{ "$https": { "connectionName": "azureopenai", "method": "POST", "as": "apiResults", "config": { "parseJsonStrings": true }, "payload": [ { "$project": { "_id": 0, "model": "gpt-4o-mini", "response_format": { "type": "json_object" }, "messages": [ { "role": "system", "content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions the create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)" }, { "role": "user", "content": { "$toString": "$titles" } } ] } } ] } }
注意
流处理器本身是“智能的”。它在数据到达磁盘之前,将标题列表转换为语义摘要(例如,“用户正在研究半导体制造物流”)。这与传统的批处理管道有根本区别,后者通常将原始会话数据写入数据库,然后从应用服务器调用外部 API。
将会话摘要写入新集合。
将以下阶段添加到管道中,以从 LLM 输出中提取摘要并将其写入新的集合:
$match:过滤掉 LLM 调用失败并返回错误的任何会话,以避免将不完整的数据写入数据库。$addFields:从 LLM 输出中提取summary字段,并将其添加到文档的顶层。$project:从文档中删除原始 LLM 输出,以减少噪音和存储成本。$merge:将生成的文档写入到点击流集群news数据库(userevents连接)中名为user_intent的新集合中。该集合中的每个文档代表一个用户会话,并包含用户兴趣的摘要。
{ "$match": { "titles": { "$exists": true }, "apiResults": { "$exists": true } } }, { "$addFields": { "summary": { "$arrayElemAt": [ "$apiResults.choices.message.content.summary", 0 ] } } }, { "$project": { "apiResults": 0 } }, { "$merge": { "into": { "coll": "user_intent", "connectionName": "userevents", "db": "news" } } }
启动流处理器。
当您准备开始总结点击流数据时,单击流处理工作区流处理器列表中的 userIntentSummarizer 处理器的 Start 图标。
该管道应将包含会话摘要的文档写入user_intent集合,这些摘要捕获了用户的兴趣。例如:
{ "_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c", "summary": "The user seems interested in geopolitical developments, especially in the Middle East, US political strategies involving Trump, and legal aspects of government operations.", "titles": [ "Israel and Hamas agree to part of Trump's Gaza peace plan, will free hostages and prisoners", "Top officials from US and Qatar join talks aimed at brokering peace in Gaza", "How Trump secured a Gaza breakthrough", "Ontario's anti-tariff ad is clever, effective and legally sound, experts say", "Shutdown? Trump's been dismantling the government all year", "AP News Summary at 7:58 p.m. EDT" ] }
以下是 userIntentSummarizer流处理器的完整JSON定义,该处理器执行阶段 1 和 2 中描述的所有操作,包括摄取点击流数据、使用文章元元数据丰富数据、会话化用户行为、调用 LLM 来总结用户意图,并将摘要写入新的集合。
{ "name": "userIntentSummarizer", "pipeline": [ { "$source": { "connectionName": "userevents", "db": "news", "collection": "user_events" } }, { "$lookup": { "from": { "connectionName": "userevents", "db": "news", "coll": "articles" }, "localField": "article_id", "foreignField": "_id", "as": "article_details", "pipeline": [ { "$project": { "_id": 0, "description": 1, "keywords": 1, "title": 1 } } ] } }, { "$addFields": { "description": { "$arrayElemAt": [ "$article_details.description", 0 ] }, "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] }, "title": { "$arrayElemAt": [ "$article_details.title", 0 ] } } }, { "$project": { "userId": 1, "article_id": 1, "eventTime": 1, "event_type": 1, "device": 1, "session_id": 1, "description": 1, "keywords": 1, "title": 1 } }, { "$sessionWindow": { "partitionBy": "$session_id", "gap": { "unit": "second", "size": 60 }, "pipeline": [{ "$group": { "_id": "$session_id", "titles": { "$push": "$title" } } }] } }, { "$https": { "connectionName": "azureopenai", "method": "POST", "as": "apiResults", "config": { "parseJsonStrings": true }, "payload": [ { "$project": { "_id": 0, "model": "gpt-4o-mini", "response_format": { "type": "json_object" }, "messages": [ { "role": "system", "content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions then create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)" }, { "role": "user", "content": { "$toString": "$titles" } } ] } } ] } }, { "$match": { "titles": { "$exists": true }, "apiResults": { "$exists": true } } }, { "$addFields": { "summary": { "$arrayElemAt": [ "$apiResults.choices.message.content.summary", 0 ] } } }, { "$project": { "apiResults": 0 } }, { "$merge": { "into": { "coll": "user_intent", "connectionName": "userevents", "db": "news" } } } ] }
阶段 3:语义搜索与服务个性化推荐
最后,我们使用 MongoDB Vector Search 对文章目录执行语义搜索,利用上一阶段生成的会话摘要来驱动个性化内容推荐。
准备用于语义检索的文章数据。
在执行语义搜索之前,您需要为文章数据生成向量嵌入。为此,创建一个名为 vector_index 的 MongoDB 向量搜索索引,将 articles 集合的 description 字段索引为 autoEmbed 类型。这指示 MongoDB Vector Search 使用自动嵌入功能,在集合中插入或更新文档时为 description 字段自动生成向量嵌入。
重要
自动嵌入作为预览功能仅适用于MongoDB Community Edition v8.2 及更高版本。在预览期间,功能和相应的文档可能随时更改。要学习;了解更多信息,请参阅预览功能。
Atlas 支持在所有版本的 MongoDB 中进行手动嵌入。
使用此JSON定义在这些字段上创建向量索引:
将
description字段设置为autoEmbed类型,指示 MongoDB Vector Search 在集合中插入或更新文档时,使用voyage-4-large嵌入模型自动为description字段生成向量嵌入。将
title字段设置为filter类型,以便使用字段中的字符串值对语义搜索数据进行预过滤。这允许您从搜索结果中排除用户已阅读的文章。
{ "fields": [ { "type": "autoEmbed", "modalitytype": "text", "path": "description", "model": "voyage-4-large" }, { "type": "filter", "path": "title" } ] }
运行语义搜索查询以提供个性化推荐。
当用户访问您的网站时,获取其当前会话的摘要,并将其用作针对文章目录进行向量搜索的查询。由于您在索引上启用了自动嵌入,MongoDB Vector Search 在查询时自动为摘要生成嵌入,并将其用作有效查询向量。
此示例展示了一个简化的向量搜索查询,它使用会话 summary 作为查询向量,并根据 titles 字段排除用户已阅读的文章:
[{ "$vectorSearch": { "index": "vector_index", // Vector index with autoEmbed on article descriptions "path": "description", "query": { "text": "<session-summary>" // Session summary from user_intent document }, "numCandidates": 100, "filter": { "title": { "$nin": [<read-titles>] } // Exclude articles the array of titles in the user_intent document } } }]
关键要点
该架构展示了构建现代数据产品方面的几个重要进展:
减少延迟:将 LLM 调用直接嵌入流处理器中,消除了多个网络跃点和中间持久层。该系统近乎实时地将原始点击转换为可操作的意图。
增强开发者体验:使用基于 JSON 的 MQL 定义管道,使已经熟悉 MongoDB 查询的团队能够构建高级流媒体和 AI 驱动的工作负载,而无需学习新的 DSL 或配置额外的基础设施。
语义个性化:超越关键词匹配和隔夜批处理作业,构建能够倾听、思考并即时响应用户行为的系统。
作者
Vinod Krishnan,解决方案架构师,MongoDB
了解详情
要了解Atlas Vector Search如何支持语义搜索并实现实时分析,请访问Atlas Vector Search页面。
要学习;了解MongoDB如何改变媒体运营,请阅读《 AI支持的媒体个性化: MongoDB和 Vector Search》一文。
要了解MongoDB如何支持现代媒体工作流程,请访问MongoDB媒体和娱乐页面。
要了解有关 Atlas Stream Processing 的更多信息,请参阅 Atlas Stream Processing 文档。