对于AI助手:文档索引位于 https://www.mongodb.com/zh-cn/docs/llms.txt — 通过将 .md 附加到任何URL路径,可以获得所有页面的降价版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

将MongoDB与 Feast 集成

Feast 提供了一个高级别的 FeatureStore API ,允许您定义功能和功能组(功能视图)、在线和离线存储以及将数据动态从离线存储移动到在线存储(物化)的能力。MongoDB集成允许您将MongoDB用作Feast的在线离线存储,因此您只需定义一次特征,并在model培训和在线推理中一致地提供服务这些特征,而无需维护单独的存储系统。

MongoDB 灵活的 document model 和 MQL 使其能够处理离线存储所需的复杂查询模式。对于在线存储, MongoDB针对网络扩展的访问权限模式进行了优化 — 快速读/写入、水平扩展以及可最大限度减少连接和往返的灵活模式。

在此集成概述中,您可以找到:

  • 介绍将MongoDB作为 Feast 的在线和离线存储。

  • Feast 概念如何映射到MongoDB。

  • MongoDB离线和在线存储设计的详细解释。

  • 在 Feast 中设置MongoDB存储的配置示例。

  • 在线存储是由单个MongoDB集合支持的键值存储,针对在线推理期间每个实体最新功能的低延迟检索进行了优化。

  • 离线存储是一个计算和转换层,用于查询存储在MongoDB集合(通常名为 feature_history)中的历史功能数据行,以进行培训 datasets、评分和物化(将数据提升到在线存储)。

典型的端到端工作流程如下所示:

  1. 定义点MongoDB支持的集合的实体、功能视图和数据源。

  2. 通过 offline_write_batch 将功能数据导入到离线存储中,该存储接受 PyArrow 表作为 input,并遵循离线存储模式将数据插入到 feature_history MongoDB集合中。

  3. 使用 get_historical_features 生成培训数据,这会对MongoDB中存储的历史功能行运行高效的时间点联接。

  4. 使用 pull_latest_from_table_or_queryonline_write_batch 将离线存储中的最新功能值物化到在线存储中。

  5. 通过 Feast 的在线API在线提供服务功能,该 API 会从由序列化实体键作为键的单个MongoDB集合中读取数据。

MongoDB集成遵循 Feast 的标准概念model,但将这些抽象映射到MongoDB模式,该模式专为以实体为中心的在线document和仅附加历史事件而设计。

盛宴概念
在盛宴中的角色
MongoDB表示

实体

功能描述的领域对象(例如驱动程序、用户)。

编码为序列化实体键;在线存储中存储为 _id,离线存储中存储为 entity_id

连接键

用于标识数据框中实体行的列。

馈入 serialize_entity_key;生成的字节用作MongoDB中的实体标识符。

序列化 EntityKey

连接键名称和值的确定性 binary 编码。

在线:_id: serialized_entity_key(主键)。离线:feature_history document 中的 entity_id: Binary(...)字段。

功能

某一点的命名、类型化测量值。

features 子文档(离线)或 features.<feature_view>.<feature_name>(在线)内的字段。

FeatureView

将功能绑定到实体、数据源和TTL;组织单位。

离线:每个历史 document 上的 feature_view 鉴别器 string。在线:嵌套在 features.<feature_view> 下的群组和 event_timestamps 中的每个 FV 时间戳。

DataSource

指向历史功能所在位置的元数据指针。

MongoDBSource 指向MongoDB集合(database, collection, connection_string) 和时间戳。

OfflineStore

历史特征和 PIT 连接的读/写入接口。

MongoDBOfflineStore 在具有复合索引feature_history集合上运行MQL聚合的实现。

OnlineStore

每个实体最新功能值的低延迟存储。

_id = serialized_entity_key 为键控的实体 document 的单个MongoDB集合,其中包含嵌套的 featuresevent_timestamps 子 document。

TTL

FeatureView 级新鲜度窗口。

在计算历史功能时的离线查询和Python后过滤器中强制执行;也可与索引中的 created_timestampupdated_at 组合使用。

FeatureService

model功能参考的命名列表。

没有直接的MongoDB表示; Feast 用于决定从在线存储读取哪些 features.<feature_view>.<feature_name> 路径。

注册表

实体、功能视图和服务的元数据存储。

未更改; MongoDB集成不会取代 Feast 注册表。

RetrievalJob

延迟执行包装器返回功能表。

对于MongoDB离线存储,封装MQL聚合并公开由游标到 Arrow 转换支持的 Arrow 导出。

物化

有计划地将最新的离线功能传播到在线存储中。

通过 pull_latest_from_table_or_queryfeature_history 上实施,然后 online_write_batch 到在线MongoDB集合中。

MongoDB离线存储使用单个共享集合(默认为 feature_history)来存储所有功能视图的仅追加历史功能行。

每个 document 表示一个 FeatureView 在特定事件时间戳对一个实体的一次观察:

{
"entity_id": "Binary(...)",
"feature_view": "driver_stats",
"event_timestamp": "ISODate(2024-01-15T12:00:00Z)",
"created_at": "ISODate(2024-01-15T12:01:00Z)",
"features": {
"conv_rate": 0.72,
"acc_rate": 0.91,
"avg_daily_trips": 14
}
}

主要属性:

  • Append-only:历史数据被视为不可变;更正将写入为具有较新的 created_at 时间戳的新行,而不是就地更新。

  • 时间序列友好:event_timestamp 表示观察到功能值的时间;当多个观测股票相同的事件时间戳时,created_at 用作决胜局。

  • 按 FeatureView 进行功能分组:feature_view 标识该行属于哪个 FeatureView,因此单个集合可以托管多个 FV。

单个复合索引支持所有主要查询模式:

(entity_id ASC, feature_view ASC, event_timestamp DESC, created_at DESC)

该索引支持对实体和功能视图进行高效范围扫描,同时确保在聚合期间首先看到每个 (entity_id, feature_view) 的最新观察。

复合索引如何为离线存储发出的每个查询提供服务。

查询模式
索引行为

$match { entity_id: {$in: [...]}, feature_view: {$in: [...]} }

(entity_id, feature_view) 前缀进行索引范围扫描。

$sort { entity_id, feature_view, event_timestamp DESC, created_at DESC }

排序是一项无需操作 —索引顺序与排序顺序匹配。

$group $first

游标首先按照 (entity_id, feature_view) 访问最新 document; $group $first 会立即选取它。

pull_latest_from_table_or_query

$match { feature_view } + $group $first by entity_id — 对 (entity_id, feature_view) 进行部分前缀扫描。

如果没有此索引,所有四种查询模式都会降级为 COLLSCAN。该索引在首次使用时通过 _ensure_indexes 延迟创建,并在进程级 _indexes_ensured设立按连接字符串缓存,因此每个进程生命周期仅创建一次。

MongoDB离线存储实现了标准的 Feast 离线存储接口:

  • offline_write_batch — 将功能数据的 pyarrow.Table 写入根本的MongoDB集合,使用配置的 MongoDBSource 元元数据来确定 connection_stringdatabasecollection

  • get_historical_features - 给定实体和事件时间戳的 entity_df 以及一设立FeatureView,返回一个扩展表,其中每一行都包含时间点正确的功能值:对于每个 (entity_id, event_timestamp) 对,最新的功能值,其 event_timestamp <= entity_event_timestamp并在TTL内进行选择。

  • pull_latest_from_table_or_query — 为每个实体返回一行,其中包含时间窗口内的最新功能值,Feast 的物化引擎使用该行为在线存储播种。

  • pull_all_from_table_or_query — 在指定日期范围内从数据源中检索所有行以进行导出或检查,并由相同的 feature_history模式和索引提供支持。

  • persist (通过 RetrievalJob.persist)- 通过 SavedDatasetStorage(与 feature_history 不同)将历史功能查询的结果写入单独的集合或外部接收器。

仅追加写入语义、批处理和冲突解决。

调用路径:

FeatureStore.write_to_offline_store(feature_view_name, df)
→ provider.ingest_df_to_offline_store(feature_view, arrow_table)
→ OfflineStore.offline_write_batch(config, feature_view, table, progress)

仅追加语义:在 10,000-document批处理中使用 insert_many(ordered=False) 插入文档。写入时无需更新或插入(upsert)或去重— 允许并保留同一 (entity_id, feature_view, event_timestamp) 元组的多个 document。

冲突解决延迟到读取时间:

  • pull_latest_from_table_or_query 选择获胜 event_timestamp群组中 created_at 最高的document。

  • get_historical_features (评分路径)使用 $sort … created_at DESC,因此 $group $first 也会在时间戳平局时选择最高的 created_at

因此,使用较晚的 created_at 写入的更正获胜,无需任何删除或更新操作。

用于最新功能检索的完整聚合阶段。

pull_latest_from_table_or_query[start_date, end_date]窗口中为每个包含最新功能值的实体返回一行。未提供 entity_df

管道阶段:

$match { feature_view, event_timestamp: {$gte, $lte} }
→ $sort { entity_id, event_timestamp DESC, created_at DESC }
→ $group $first by entity_id
→ $project { entity_id, event_timestamp, features.* }

复合索引有效地为 $match + $sort 提供服务; $group $first 为每个实体挑选一个 document,而不具体化其余 document。

推荐的离线实施是基于聚合的MongoDB离线存储,名为 MongoDBOfflineStore

主要特征:

  • 使用由所有 FeatureView 共享的单个 feature_history集合,通过 feature_view 进行区分。

  • 依赖于复合索引(entity_id, feature_view, event_timestamp, created_at) 进行所有查询,避免全集合扫描。

  • 使用服务器端 $group $first 来“评分”工作负载(每个实体一行),并使用 pd.merge_asof 使用重复的实体 ID 来“培训”工作负载,从而平衡正确性和性能。

  • 通过分块控制内存使用量,因此可以在不耗尽RAM 的情况下处理较大的 entity_df 值。

基准测试显示,与其他MongoDB离线方法相比,此实施提供了吞吐量和内存效率的最佳组合。

时间点连接、评分与培训路径以及正确性权衡。

get_historical_features 是核心 Feast API。它接受一个 entity_df(N 行实体键列 + event_timestamps)和 K 个 FeatureView 对象,并返回一个具有相同 N 行加 M功能列的 DataFrame,在每行的 event_timestamp(点实时正确性)。

符号:

  • N → 实体数量

  • M → 功能数量

  • P → 观察数量

  • F →功能视图数量

  • K → 单次 get_historical_features 调用中请求的功能视图数

评分路径

entity_df 没有重复的实体 ID 时,评分路径被激活,这是一种常见的推理场景,其中每行都在不同的时间点请求不同实体的功能。

检测:

scoring_path = (
entity_df[all_entity_id_cols].drop_duplicates().shape[0]
== len(entity_df)
)

评分时,会添加服务器端 $group $first 阶段:

$match → $sort → $group $first → $project

$group(entity_id, feature_view) 进行分组,并选择具有最高 (event_timestamp, created_at) 的 document— 即,按索引顺序位于前面的 $sort 之后的第一个 document。MongoDB从不为每个功能视图物化每个实体的其他 P-1 document;选取一个document后,游标只需前进到下一个群组键。每个实体的费用为 O(日志 P)(索引查找),而不是 O(P)。

$match 使用 event_timestamp: {$lte: max_ts},其中 max_ts 是当前数据数据块中的最大实体请求时间戳。这是一个保守的近似值(“超调”):服务器将来可能会稍微为某些实体返回document。下面的Python后过滤器通过消除无效行来纠正此问题:

# Merge on entity_id (left = entity_df rows, right = server results)
merged = result[["_fv_entity_id", event_timestamp_col]].merge(
fv_join, on="_fv_entity_id", how="left"
)
# Null out rows where the server doc is in the future or outside TTL
future_mask = merged["_fv_ts"] > merged[event_timestamp_col]
if fv.ttl:
ttl_mask = merged["_fv_ts"] < (
merged[event_timestamp_col] - fv.ttl
)
bad_mask = future_mask | ttl_mask
else:
bad_mask = future_mask
for feat in features:
vals = merged[feat].copy()
vals[bad_mask | merged["_fv_ts"].isna()] = None
result[col] = vals.values

这是一次 pd.merge 调用,然后是矢量化布尔索引— 在 Pandas C代码中的工作时间为 O(N),与 P 和 M 无关。

培训路径

entity_df 具有重复的实体 ID 时(每个实体有许多时间戳快照的培训 datasets),则省略 $group 阶段。该聚合返回每个实体的时间戳窗口中的所有document,并且Python使用 pd.merge_asof 在每行的 event_timestamp 处或之前查找最新document:

$match → (no $group)
result = pd.merge_asof(
result.sort_values(event_timestamp_col),
fv_df_subset.sort_values("_fv_ts"),
left_on=event_timestamp_col,
right_on="_fv_ts",
by="_fv_entity_id",
direction="backward",
)

用于控制大型datasets内存使用量的两级数据块。

两个级别的数据块控制内存使用情况:

等级
恒定
用途

外部 CHUNK_SIZE

50,000 行

限制传递给 _run_singleentity_df 切片;在Python中对峰值结果 DataFrame 进行大写处理。

内部 MONGO_BATCH_SIZE

10,000 实体 ID

限制每次聚合调用的 {$in: [...]}数组大小;避免过大的BSON消息。

对于大于 CHUNK_SIZEentity_df,外循环会运行多个 _run_single 调用并将结果连接起来:

if len(working_df) <= CHUNK_SIZE:
result_df = _run_single(working_df, coll)
else:
chunks = [
_run_single(chunk, coll)
for chunk in _chunk_dataframe(working_df, CHUNK_SIZE)
]
result_df = pd.concat(chunks, ignore_index=True)

因此,无论总 N 是多少,Python 端内存峰值都是 O(CHUNK_SIZE x M x K)。

将MongoDB文档中的功能提取到 DataFrame 列中。

MongoDB features 子文档使用 pd.apply 而不是 pd.json_normalize 扩展到各个列。这会保留 json_normalize 会展平或丢失的复杂类型(Map 和 Struct 的字典,数组的列表)。还应用了反向字段映射,以便投影的列名称与 FeatureView 定义匹配:

if "features" in fv_df.columns:
for feat in features:
src_col = reverse_fm.get(feat, feat)
fv_df[feat] = fv_df["features"].apply(
lambda d, _s=src_col: (
d.get(_s) if isinstance(d, dict) else None
)
)
fv_df = fv_df.drop(columns=["features"])
功能
支持?
注意

get_historical_features (PIT 联接)

使用索引聚合和 Pandas merge-asof 通过 MongoDBOfflineStore 实施。

pull_latest_from_table_or_query

使用 $match + $sort + $group $first 而不是 (entity_id, feature_view, event_timestamp, created_at)

pull_all_from_table_or_query

完整的历史扫描,时间过滤器超过 feature_history

offline_write_batch

通过配置的 MongoDBSource 将 Arrow 表写入MongoDB 。

persist

使用 SavedDatasetStorage 将历史查询结果导出到单独的集合。

直接导出到 data lake 或数据仓库等其他便利性取决于特定的 RetrievalJob实现,并且预计将遵循 Feast 的线下存储标准模式。

MongoDB在线存储对所有 FeatureView 使用单个集合,并以序列化实体键为键。

  • _idserialized_entity_key(entity_key),由 Feast 的稳定编码函数生成,该函数对实体名称和值进行排序并将其编码为字节。

  • features:嵌套子文档,其中每个 FeatureView 都维护自己的功能命名空间。

  • event_timestamps:每个 FeatureView 的时间戳,表示写入该 FeatureView 的最新值的时间。

  • created_timestampupdated_at:可用于TTL索引和诊断的簿记字段。

示例(简化):

{
"_id": "b\"<serialized_entity_key>\"",
"features": {
"driver_stats": {
"rating": 4.91,
"trips_last_7d": 132
},
"pricing": {
"surge_multiplier": 1.2
}
},
"event_timestamps": {
"driver_stats": "ISODate(2026-01-01T12:00:00Z)",
"pricing": "ISODate(2026-01-21T12:00:00Z)"
},
"created_timestamp": "ISODate(2026-01-21T12:00:00Z)"
}

设计理由:

  • 单个集合将每个实体的 document 保存在一个 document 中,这符合 Feast 对基于键的查找的期望,并避免了每个 FeatureView 集合之间的状态碎片化。

  • 使用序列化实体键作为 _id 可以重复使用 Feast 的确定性编码,避免跨集合使用重复的主键,并保持检索每个实体的单个键查找。

在线和离线存储模式的详细设计原理。

与离线存储(使用带有 feature_view 鉴别器字段的单个 feature_history集合)一样,在线存储也对所有 FeatureView 使用单个集合。

在线商店本质上是面向实体键的,而不是面向功能视图的。尽管高级 FeatureStore API会使用单个 FeatureView 调用 online_readonline_write_batch,但 Feast 中的根本的存储 model 是围绕每个实体键的单个逻辑行设计的。随着时间的推移,该行可能会累积来自多个 FeatureView 的特征。

使用一个集合可以让我们为每个实体维护一个统一的document,并仅原子性地更新相关子document(例如 features.<feature_view_name>),而无需跨集合复制实体键。

单一集合设计从一开始就是 Feast 的标准(最初是为 Redis 设计的),并且发挥了 MongoDB 的优势。优点包括:

  • 减少写入放大

  • 简化索引管理(只有一个主节点 (primary node in the replica set)_id索引)

  • 当多个 FeatureView股票相同实体时,没有跨集合协调

  • 与 Feast 基于键的获取 model 实现一致的检索语义

按FeatureView集合设计会分散实体状态,如果组合了功能,则需要额外的协调或多集合查询,并且会增加操作开销,而 Feast 的访问权限模式没有性能优势。

将实体键序列化为 _id:Feast 提供 serialize_entity_key,这是一种稳定的编码函数,可在连接之前对实体名称和值进行显式排序,以确保可预测的字节序列(使用生成字节的 struct.pack 进行类型化)。这意味着我们可以直接将其用作 _id

注意

虽然 serialize_entity_key 提供了稳定的 _id,但其输出分布式不均匀,因此不是分片的理想选择。如果您的部署需要对在线存储集合分片,请考虑使用哈希分片键或附加字段。

MongoDB在线存储实现了 Feast 的标准在线存储API:

  • online_write_batch — 在物化过程中,Feast 将每个实体的最新功能值写入MongoDB document。每次批处理更新或插入(upsert)仅更新相关的嵌套 features.<feature_view> 子document及其在 event_timestamps 中的相应条目,从而保持实体 document 的原子性和一致。

  • online_readget_online_features - 在线服务使用与离线相同的序列化逻辑将实体键解析为 _id 值,然后执行键查找。每次查找利用嵌套的 features 结构,在单次往返中返回实体的所有请求功能。

  • TTL和新鲜度 — 功能TTL在 FeatureView 上配置,主要用于离线 PIT 连接;在线TTL可以通过 updated_at 上的索引或类似时间戳来实现,这与 Feast 的概念一致,即离线存储仅用于追加,而在线存储则保留最新状态。

离线存储是使用 MongoDBOfflineStoreConfig 配置的:

class MongoDBOfflineStoreConfig(FeastConfigBaseModel):
type: str = "...MongoDBOfflineStore"
connection_string: str = "mongodb://localhost:27017"
database: str = "feast"
collection: str = "feature_history"

示例 feature_store.yaml

offline_store:
type: feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBOfflineStore
connection_string: "mongodb+srv://user:pass@cluster.mongodb.net"
database: feast
collection: feature_history

MongoDBSource 是相应的 DataSource。其 name字段成为存储在每个 document 中的 feature_view 鉴别器。有关完整的配置选项,请参阅 Feast Docs 中的MongoDB 数据源参考。

source = MongoDBSource(
name="driver_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created_at",
)
  • 按照Feast 快速入门设立本地功能存储,然后使用本页上的配置示例将MongoDB作为在线和离线存储进行交换。

  • 查看 Feast Docs 中的MongoDB Online Store 参考资料,了解配置选项、异步支持和完整功能矩阵。

  • 查看MongoDB离线存储参考,了解离线存储配置和支持的功能。

  • 查看MongoDB数据源参考,了解 MongoDBSource 选项和模式详细信息。

  • Feast 概念 指南中了解核心 Feast 概念,例如实体、功能视图和物化。