对于AI助手:文档索引位于 https://www.mongodb.com/zh-cn/docs/llms.txt — 通过将 .md 附加到任何URL路径,可以获得所有页面的降价版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

教程:持续将数据从MongoDB复制到 AWS S3

使用Atlas Data Federation和Atlas Scheduled Triggers,以Apache Parquet 格式将数据从Atlas 集群复制到 AWS S3 存储桶。 Parquet 是一种柱状格式,适用于期望数据为文件而不是文档的分析和机器学习负载。按重复安排运行副本,以卸载操作集群的分析查询负担。

本教程使用增量方法,这意味着每次触发器运行复制过去 60 秒的文档。另一种选择是完整快照,每次都会复制整个集合。正确的方法取决于您的数据量和下游消费者的要求。

本教程中的 maxFileSizemaxRowGroupSize 值针对测试而不是生产进行优化。对于生产工作负载,查看$out 阶段选项,并根据查询模式调整文件大小和分区。

在开始本教程之前,请完成以下任务:

1

联合数据库实例将多个数据源整合到单个可查询接口中。在本教程中,您将连接 S3 存储桶和Atlas 集群,作为同一联合数据库实例中的数据存储。连接两个数据存储后,副本触发器可从集群读取数据并写入S3

  1. 部署具有 S3 数据存储的联合数据库实例。要学习;了解如何操作,请参阅部署联合数据库实例数据存储。配置 S3 数据存储时,请授予 IAM角色Read and write 对该存储桶的访问权限,以便Atlas Data Federation可以写入Parquet 文件。

  2. 将Atlas 集群添加为联合数据库实例中的第二个数据存储。

完成这些步骤后,请记下联合数据库实例服务的名称。您在后续步骤中需要使用此名称。

2

创建定时触发器,每分钟向集群插入一个新文档。这会生成测试数据,以便您验证副本触发器是否有效。

  1. 在 Atlas 中,前往 Triggers 页面。

    1. 如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。

    2. 如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。

    3. 在侧边栏中,单击 Streaming Data 标题下的 Triggers

    会显示触发器页面。

  2. 单击 Add Trigger(连接)。

  3. 选择 Scheduled 作为 Trigger Type

  4. Trigger Details 中,设立以下配置:

    设置
    Trigger Name

    Create_Event_Every_Min_Trigger

    Schedule Type
    Basic
    Interval

    1 分钟

    Event Type
    Function
  5. Function 部分中,选择 + New Function 并输入以下代码。将占位符值替换为您的Atlas服务名称、数据库和集合。

    exports = function () {
    const mongodb = context.services.get(
    "NAME_OF_YOUR_ATLAS_SERVICE"
    );
    const db = mongodb.db("NAME_OF_YOUR_DATABASE");
    const events = db.collection(
    "NAME_OF_YOUR_COLLECTION"
    );
    const event = events.insertOne({
    time: new Date(),
    aNumber: Math.random() * 100,
    type: "event"
    });
    return JSON.stringify(event);
    };
  6. 单击 Save(连接)。

    触发器运行后,确认每分钟都有新文档出现在集群集合中。

3

创建定时触发器,使用 $out 阶段运行聚合管道,每分钟将最新文档以 Parquet 格式从集群复制到 S 存储桶。3

  1. Triggers 页面上,单击 Add Trigger

  2. 选择 Scheduled 作为 Trigger Type

  3. Trigger Details 中,设立以下配置:

    设置
    Trigger Name

    Copy_Events_To_S3_Trigger

    Schedule Type
    Basic
    Interval

    1 分钟

    Event Type
    Function
  4. Function 部分中,选择+ New Function 并输入以下代码。将占位符值替换为联合数据库实例服务、虚拟数据库、虚拟集合、S3 存储桶和 AWS地区的名称。

    exports = function () {
    const service = context.services.get(
    "NAME_OF_YOUR_FEDERATED_DATA_SERVICE"
    );
    const db = service.db(
    "NAME_OF_YOUR_VIRTUAL_DATABASE"
    );
    const events = db.collection(
    "NAME_OF_YOUR_VIRTUAL_COLLECTION"
    );
    const pipeline = [
    {
    $match: {
    "time": {
    $gt: new Date(
    Date.now() - 60 * 1000
    ),
    $lt: new Date(Date.now())
    }
    }
    },
    {
    "$out": {
    "s3": {
    "bucket": "YOUR_S3_BUCKET_NAME",
    "region": "YOUR_AWS_REGION",
    "filename": "events",
    "format": {
    "name": "parquet",
    "maxFileSize": "10GB",
    "maxRowGroupSize": "100MB"
    }
    }
    }
    }
    ];
    return events.aggregate(pipeline);
    };
  5. 单击 Save(连接)。

    触发器运行后,确认名为 的 Parquet文件出现在您的events S3 存储桶中。