使用案例: 单一视图(Single View)
行业: 金融服务
产品: MongoDB Atlas
解决方案概述
金融犯罪分子通过网络进行活动。洗钱、欺诈团伙和规避制裁依赖于Shell公司、代理董事和分层ACID 事务路径的网络。合规团队需要绘制和分析可疑实体周围的关系网络。
传统方法运行两个瓶颈:
查询成本高昂:关系数据库使用递归公用表表达式 (CTE)。超过两跳或三跳后,这些查询的成本就会变得很高。
持续的网络往返:客户端图表处理将数据提取到应用程序层。客户端为每个跃点运行一次新查询。
在此解决方案中,您:
构建一个使用MongoDB操作符的网络分析引擎来避免这些瓶颈。
直接在 MongoDB 的聚合框架内运行多跳图表遍历、最短路径检测、中心性评分、社区检测和风险传播。
该解决方案重点关注网络分析引擎和为其提供支持的MongoDB操作符。要使用MongoDB Atlas 、FastAPI 和 Next.js实现整体架构,另请遵循之前的解决方案库《金融犯罪缓解》。
为何使用MongoDB而不是专用的图数据库?
将实体和关系与搜索索引、向量嵌入和变更流一起存储在MongoDB中。
避免在两个系统之间同步数据、管理额外的基础架构以及产生跨系统延迟,因为您将图表工作负载保留在MongoDB中。
使用 MongoDB 的
$graphLookup操作符符运行广度优先搜索,并在每一跳使用标准索引查询,因此性能取决于您的索引,而不是单独的图表引擎。对于合规调查中常见的 1 到 5 跳遍历,这种方法会匹配专用图表数据库,同时查询应用程序已读取和写入的相同数据。
注意
在合规调查中,通常会从主题中追踪出几个步骤。直接交易对手和第一层中介机构位于 1–2 跳之外,而Shell结构和资金移动路径通常在 3–5 跳之内。除此之外,链接的意义就会降低,也更难以解释,因此大多数实际调查都停留在 1–5 跳范围内。
参考架构
网络分析引擎位于 FastAPI 服务层和MongoDB Atlas之间,通过聚合管道处理图表操作。
图 1。代理调查管道
双 $graphLookup 模式
洗钱网络涉及双向流动。实体可以在一种关系中获取资金,并在另一种关系中接收资金。
单个 $graphLookup 仅沿一个方向遍历边。该解决方案执行两个并行查找(正向和反向),并将结果合并到统一的网络图表中。
pipeline = [ {"$match": {"entityId": center_entity_id}}, # Forward traversal: follow source → target edges {"$graphLookup": { "from": "relationships", "startWith": "$entityId", "connectFromField": "target.entityId", "connectToField": "source.entityId", "as": "forward_relationships", "maxDepth": max_depth - 1, "restrictSearchWithMatch": { "active": True, "confidence": {"$gte": min_confidence} } }}, # Reverse traversal: follow target → source edges {"$graphLookup": { "from": "relationships", "startWith": "$entityId", "connectFromField": "source.entityId", "connectToField": "target.entityId", "as": "reverse_relationships", "maxDepth": max_depth - 1, "restrictSearchWithMatch": { "active": True, "confidence": {"$gte": min_confidence} } }}, # Merge both directions {"$project": { "entityId": 1, "all_relationships": { "$concatArrays": [ "$forward_relationships", "$reverse_relationships" ] } }}, {"$unwind": "$all_relationships"}, {"$replaceRoot": {"newRoot": "$all_relationships"}}, {"$limit": max_relationships} ]
图 2。双 $graphLookup — 双向网络发现
应用程序在单次往返中接收完整的网络图表。两次遍历都在一个聚合管道中运行,这解决了为初始查询中返回的每个文档发出单独的后续查询的 N+1查询问题。
How $graphLookup 遍历图表
$graphLookup 在离散波形中执行广度优先搜索(BFS):
种子:根据输入文档评估
startWith。数组值同时播种边界(多根 BFS)。查询:构造
{ connectToField: { $in: [frontier_values] }},与任何restrictSearchWithMatch过滤合并。作为针对 from集合的标准索引查询来执行。扩展:对于不在访问设立的每个匹配文档,将其添加到结果中,提取
connectFromField值,然后将其推送到下一个边界。重复:增加深度。返回步骤2 ,直到边界为空或达到
maxDepth。组装:将所有累积的文档放入 as字段下的大量中。
循环检测是自动进行的。内部访问设立可防止循环图中出现无限循环 (A → B → C → A),这种情况在洗钱结构中很常见。每个文档在结果中只出现一次。
使用 restrictSearchWithMatch 优势
restrictSearchWithMatch 将过滤条件推送到遍历本身中,而不是作为后筛选器。 MongoDB在遍历期间修剪死分支,而不是发现完整的图表并随后进行过滤。对于大型网络,这可以将工作集减少一个数量级。
图 3。遍历期间过滤与后过滤
数据模型方法
该解决方案使用两个集合:entities 和 relationships。它们遵循邻接列表模式。边缘元数据(置信度、证据、验证状态)作为关系文档中的一等字段存在。
关系模式
{ "relationshipId": "REL_8910", "source": { "entityId": "ENT_123", "entityType": "individual" }, "target": { "entityId": "ENT_456", "entityType": "organization" }, "type": "beneficial_owner_of", "direction": "directed", "strength": 0.85, "confidence": 0.95, "active": true, "verified": true, "evidence": [ { "evidence_type": "corporate_registry", "confidence": 0.95, "source": "Companies House UK" } ], "datasource": "KYC_onboarding" }
关键设计决策
嵌套源/目标引用:
source.entityIdtarget.entityId$graphLookupconnectFromField和connectToField结构直接映射到 的 和 参数。它还在边缘级别保留实体类型元数据,而无需连接。分离
strength和confidence字段:强度体现了关系的固有牢固程度,例如持有90 % 所有权的 UBO 与遥远的业务伙伴。置信度是指对由两个独立来源验证的数据点与从共享解决推断的数据点的信任度。风险传播以不同方式使用这两个值。active软删除的布尔值:AML 系统需要Atlas 审核跟踪。通过设置active: false来删除关系,而不是删除文档。此标志还用作restrictSearchWithMatch中的遍历过滤。
构建解决方案
克隆存储库并按照 GitHub 自述文件中的设置说明进行操作:
git clone https://github.com/mongodb-industry-solutions/fsi-aml-fraud-detection.git cd fsi-aml-fraud-detection/aml-backend poetry install poetry run uvicorn main:app --host 0.0.0.0 --port 8001 --reload
以下小节将逐步介绍在 NetworkRepository 中实施的六个核心图表操作。
创建所需索引
$graphLookup在每次 BFS 波次时发出{ connectToField: { $in: [frontier] } }查询。两个遍历方向上的索引connectToField:
db.relationships.createIndex({ "source.entityId": 1, "active": 1, "confidence": -1 }); db.relationships.createIndex({ "target.entityId": 1, "active": 1, "confidence": -1 });
索引优势在 1–3 跳处最强,并随着深度的增加而减小。 AML 调查中的 1 到 4 跳遍历正好处于最佳位置。
查找最短路径depthField ()
确定低风险客户是否连接到受制裁实体,并重建确切的链条。
使用
depthField,用其跃点距离注释每个发现的关系:
pipeline = [ {"$match": {"entityId": source_entity_id}}, {"$graphLookup": { "from": "relationships", "startWith": "$entityId", "connectFromField": "source.entityId", "connectToField": "target.entityId", "as": "forward_paths", "maxDepth": max_depth - 1, "depthField": "depth" }}, {"$graphLookup": { "from": "relationships", "startWith": "$entityId", "connectFromField": "target.entityId", "connectToField": "source.entityId", "as": "reverse_paths", "maxDepth": max_depth - 1, "depthField": "depth" }}, {"$project": { "all_paths": {"$concatArrays": ["$forward_paths", "$reverse_paths"]} }}, {"$unwind": "$all_paths"}, {"$match": {"$or": [ {"all_paths.source.entityId": target_entity_id}, {"all_paths.target.entityId": target_entity_id} ]}}, {"$sort": {"all_paths.depth": 1}}, {"$limit": 1} ]
结果确定了最短深度。该深度的第二个有界 $graphLookup 会重建完整的关系链:
计算网络统计信息$facet ()
使用
$facet对单个管道中的同一实体设立运行五项并行分析 — 风险分布、实体类型细分、中心检测、显着性评分和基本指标:
stats_pipeline = [ {"$match": {"entityId": {"$in": network_entity_ids}}}, {"$addFields": { "connection_count": {"$size": {"$ifNull": ["$connected_entities", []]}} }}, {"$facet": { "basic_stats": [{"$group": { "_id": None, "total_nodes": {"$sum": 1}, "avg_risk_score": {"$avg": "$riskAssessment.overall.score"}, "max_risk_score": {"$max": "$riskAssessment.overall.score"} }}], "risk_distribution": [ {"$group": {"_id": "$riskAssessment.overall.level", "count": {"$sum": 1}}}, {"$sort": {"_id": 1}} ], "hub_entities": [ {"$match": {"connection_count": {"$gte": 2}}}, {"$sort": {"connection_count": -1}}, {"$limit": 5}, {"$project": {"entityId": 1, "name": 1, "connection_count": 1}} ] }} ]
如果没有 $facet,则每次分析都需要一个单独的管道。 $facet 并行处理所有子管道并在单个响应中返回结果。此操作将在 2–5 毫秒内执行。
分数中心性($switch 表示特定领域的权重)
识别连接最可疑行为者的实体。
中心性管道使用
$facet分别聚合传出和传入连接,然后进行合并和评分。$switch操作符直接在聚合内为每个关系类型分配风险权重:
"outgoing_risk_weighted": { "$sum": {"$multiply": [ "$confidence", {"$switch": { "branches": [ {"case": {"$in": ["$type", [ "confirmed_same_entity", "business_associate_suspected" ]]}, "then": 0.9}, {"case": {"$in": ["$type", [ "director_of", "ubo_of", "parent_of_subsidiary" ]]}, "then": 0.7}, {"case": {"$in": ["$type", [ "household_member", "professional_colleague_public" ]]}, "then": 0.3} ], "default": 0.5 }} ]} }
confirmed_same_entity连接对风险加权中心性的贡献远大于household_member链接。最终的综合分数融合了归一化度中心性 (40%)、平均置信权重 (30%) 和风险加权中心性 (30%) — 所有这些都在服务器端计算。
检测社区并传播风险
社区检测通过聚合构建邻接图,并在
confidence >= 0.7上进行过滤以仅围绕高置信度关系绘制社区边界:
adjacency_pipeline = [ {"$match": { "$or": [ {"source.entityId": {"$in": entity_ids}}, {"target.entityId": {"$in": entity_ids}} ], "active": True, "confidence": {"$gte": 0.7} }}, {"$group": { "_id": "$source.entityId", "connections": {"$addToSet": "$target.entityId"} }} ]
$addToSet自动对连接进行重复数据删除 —共享shared_address和business_associate关系的两个实体显示为单个连接。
$graphLookup发现网络后,风险传播通过关系链应用指数衰减:
propagated_risk = ( parent_entity_risk * propagation_factor # decay per hop (default: 0.5) * relationship_confidence # trust level for this edge * type_risk_weight # domain-specific weight )
受制裁实体的Shell公司(高置信度、高风险关系类型)获得几乎满分的风险评分。他们的会计师的社交媒体连接几乎没有收到任何信息。遍历是广度优先,深度限制为 3 跳,并在传播的分数低于可配置阈值时停止。
关键要点
使用对偶
$graphLookup用于双向网络发现:运行两次并行查找(正向和反向),并将结果与$concatArrays合并,以在一次聚合中捕获完整网络。将筛选器推入遍历
restrictSearchWithMatch:在 BFS 期间修剪死分支,而不是事后过滤完整图表,以减少大型网络的工作集。创建复合索引
connectToField和您的遍历筛选器:$graphLookup$match/$in在每次 BFS 波次都发出 查询,因此将connectToFieldactive与 和confidence一起索引可以防止在每个跃点集合扫描。计算网络分析服务器端
$facet和$switch:在并行子管道中运行风险分布、中心检测和中心性评分,并使用$switch为聚合内的关系类型分配特定领域的风险权重。应用指数衰减进行风险传播:将基于
$graphLookup的网络发现与包含关系置信度和类型的每跳衰减公式相结合,以便模型自动区分高风险结构连接和低风险社交链接。这符合以一致的命令式方式学习关键知识的指南。
作者
Luis Pazmiño
梅哈尔·格雷瓦尔
Andrea Alaman Calderon