Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
MongoDB
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
MongoDBchevron-right

Create a Data Pipeline for MongoDB Change Stream Using Pub/Sub BigQuery Subscription

Venkatesh Shanbhag, Stanimira Vlaeva5 min read • Published Oct 28, 2022 • Updated Apr 02, 2024
Google CloudNode.jsAIMongoDBChange StreamsJavaScript
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
On 1st October 2022, MongoDB and Google announced a set of open source Dataflow templates for moving data between MongoDB and BigQuery to run analyses on BigQuery using BQML and to bring back inferences to MongoDB. Three templates were introduced as part of this release, including the MongoDB to BigQuery CDC (change data capture) template.
MongoDB change stream
This template requires users to run the change stream on MongoDB, which will monitor inserts and updates on the collection. These changes will be captured and pushed to a Pub/Sub topic. The CDC template will create a job to read the data from the topic and get the changes, apply the transformation, and write the changes to BigQuery. The transformations will vary based on the user input while running the Dataflow job.
Alternatively, you can use a native Pub/Sub capability to set up a data pipeline between your MongoDB cluster and BigQuery. The Pub/Sub BigQuery subscription writes messages to an existing BigQuery table as they are received. Without the BigQuery subscription type, you need a pull or push subscription and a subscriber (such as Dataflow) that reads messages and writes them to BigQuery.
This article explains how to set up the BigQuery subscription to process data read from a MongoDB change stream. As a prerequisite, you’ll need a MongoDB Atlas cluster.
To set up a free tier cluster, you can register to MongoDB either from Google Cloud Marketplace or from the registration page. Follow the steps in the MongoDB documentation to configure the database user and network settings for your cluster.
On Google Cloud, we will create a Pub/Sub topic, a BigQuery dataset, and a table before creating the BigQuery subscription.

Create a BigQuery dataset

We’ll start by creating a new dataset for BigQuery in Google Cloud console.
Then, add a new table in your dataset. Define it with a name of your choice and the following schema:
Field nameType
idSTRING
source_dataSTRING
TimestampSTRING
Create a new dataset and table in BigQuery

Configure Google Cloud Pub/Sub

Next, we’ll configure a Pub/Sub schema and topic to ingest the messages from our MongoDB change stream. Then, we’ll create a subscription to write the received messages to the BigQuery table we just created.
For this section, we’ll use the Google Cloud Pub/Sub API. Before proceeding, make sure you have enabled the API for your project.

Define a Pub/Sub schema

From the Cloud Pub/Sub UI, Navigate to Create Schema.
Provide an appropriate identifier, such as “mdb-to-bq-schema,” to your schema. Then, select “Avro” for the type. Finally, add the following definition to match the fields from your BigQuery table:
1{
2 "type" : "record",
3 "name" : "Avro",
4 "fields" : [
5 {
6 "name" : "id",
7 "type" : "string"
8 },
9 {
10 "name" : "source_data",
11 "type" : "string"
12 },
13 {
14 "name" : "Timestamp",
15 "type" : "string"
16 }
17 ]
18}
Create a Cloud Pub/Sub schema

Create a Pub/Sub topic

From the sidebar, navigate to “Topics” and click on Create a topic.
Give your topic an identifier, such as “MongoDBCDC.” Enable the Use a schema field and select the schema that you just created. Leave the rest of the parameters to default and click on Create Topic.
Create a Cloud Pub/Sub topic

Subscribe to topic and write to BigQuery

From inside the topic, click on Create new subscription. Configure your subscription in the following way:
  • Provide a subscription ID — for example, “mdb-cdc.”
  • Define the Delivery type to Write to BigQuery.
  • Select your BigQuery dataset from the dropdown.
  • Provide the name of the table you created in the BigQuery dataset.
  • Enable Use topic schema.
You need to have a bigquery.dataEditor role on your service account to create a Pub/Sub BigQuery subscription. To grant access using the bq command line tool, run the following command:
1bq add-iam-policy-binding \
2 --member="serviceAccount:service<project-number>@gcp-sa-pubsub.iam.gserviceaccount.com" \
3 --role=roles/bigquery.dataEditor \
4 -t "<dataset>.<table>"
Keep the other fields as default and click on Create subscription.
Create a Cloud Pub/Sub subscription

Set up a change stream on a MongoDB cluster

Finally, we’ll set up a change stream that listens for new documents inserted in our MongoDB cluster.
We’ll use Node.js but you can adapt the code to a programming language of your choice. Check out the Google Cloud documentation for more Pub/Sub examples using a variety of languages. You can find the source code of this example in the dedicated GitHub repository.
First, set up a new Node.js project and install the following dependencies.
1npm install mongodb @google-cloud/pubsub avro-js
Then, add an Avro schema, matching the one we created in Google Cloud Pub/Sub:
./document-message.avsc
1{
2 "type": "record",
3 "name": "DocumentMessage",
4 "fields": [
5 {
6 "name": "id",
7 "type": "string"
8 },
9 {
10 "name": "source_data",
11 "type": "string"
12 },
13 {
14 "name": "Timestamp",
15 "type": "string"
16 }
17 ]
18}
Then create a new JavaScript module — index.mjs. Start by importing the required libraries and setting up your MongoDB connection string and your Pub/Sub topic name. If you don’t already have a MongoDB cluster, you can create one for free in MongoDB Atlas.
./index.mjs
1import { MongoClient } from 'mongodb';
2import { PubSub } from '@google-cloud/pubsub';
3import avro from 'avro-js';
4import fs from 'fs';
5
6const MONGODB_URI = '<mongodb-connection-string>';
7const PUB_SUB_TOPIC = 'projects/<project-name>/topics/<topic-name>';
After this, we can connect to our MongoDB instance and set up a change stream event listener. Using an aggregation pipeline, we’ll watch only for “insert” events on the specified collection. We’ll also define a 60-second timeout before closing the change stream.
./index.mjs
1let mongodbClient;
2try {
3 mongodbClient = new MongoClient(MONGODB_URI);
4 await monitorCollectionForInserts(mongodbClient, 'my-database', 'my-collection');
5} finally {
6 mongodbClient.close();
7}
8
9async function monitorCollectionForInserts(client, databaseName, collectionName, timeInMs) {
10 const collection = client.db(databaseName).collection(collectionName);
11 // An aggregation pipeline that matches on new documents in the collection.
12 const pipeline = [ { $match: { operationType: 'insert' } } ];
13 const changeStream = collection.watch(pipeline);
14
15 changeStream.on('change', event => {
16 const document = event.fullDocument;
17 publishDocumentAsMessage(document, PUB_SUB_TOPIC);
18 });
19
20 await closeChangeStream(timeInMs, changeStream);
21}
22
23function closeChangeStream(timeInMs = 60000, changeStream) {
24 return new Promise((resolve) => {
25 setTimeout(() => {
26 console.log('Closing the change stream');
27 changeStream.close();
28 resolve();
29 }, timeInMs)
30 })
31};
Finally, we’ll define the publishDocumentAsMessage() function that will:
  1. Transform every MongoDB document received through the change stream.
  2. Convert it to the data buffer following the Avro schema.
  3. Publish it to the Pub/Sub topic in Google Cloud.
1async function publishDocumentAsMessage(document, topicName) {
2 const pubSubClient = new PubSub();
3 const topic = pubSubClient.topic(topicName);
4
5 const definition = fs.readFileSync('./document-message.avsc').toString();
6 const type = avro.parse(definition);
7
8 const message = {
9 id: document?._id?.toString(),
10 source_data: JSON.stringify(document),
11 Timestamp: new Date().toISOString(),
12 };
13
14 const dataBuffer = Buffer.from(type.toString(message));
15 try {
16 const messageId = await topic.publishMessage({ data: dataBuffer });
17 console.log(`Avro record ${messageId} published.`);
18 } catch(error) {
19 console.error(error);
20 }
21}
Run the file to start the change stream listener:
1node ./index.mjs
Insert a new document in your MongoDB collection to watch it go through the data pipeline and appear in your BigQuery table!

Summary

There are multiple ways to load the change stream data from MongoDB to BigQuery and we have shown how to use the BigQuery subscription on Pub/Sub. The change streams from MongoDB are monitored, captured, and later written to a Pub/Sub topic using Java libraries.
The data is then written to BigQuery using BigQuery subscription. The datatype for the BigQuery table is set using Pub/Sub schema. Thus, the change stream data can be captured and written to BigQuery using the BigQuery subscription capability of Pub/Sub.

Further reading

  1. A data pipeline for MongoDB Atlas and BigQuery using Dataflow.
  2. Setup your first MongoDB cluster using Google Marketplace.
  3. Run analytics using BigQuery using BigQuery ML.
  4. How to publish a message to a topic with schema.

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Testing and Packaging a Python Library


Aug 14, 2024 | 8 min read
Code Example

Connect to a MongoDB Database Using Node.js 3.3.2


Oct 01, 2024 | 5 min read
Tutorial

Spring Data Unlocked: Getting Started With Java and MongoDB


Nov 11, 2024 | 5 min read
Tutorial

Create a RESTful API With .NET Core and MongoDB


Sep 11, 2024 | 8 min read
Table of Contents