Create a Data Pipeline for MongoDB Change Stream Using Pub/Sub BigQuery Subscription
Venkatesh Shanbhag, Stanimira Vlaeva5 min read • Published Oct 28, 2022 • Updated Apr 02, 2024
Rate this tutorial
On 1st October 2022, MongoDB and Google announced a set of open source Dataflow templates for moving data between MongoDB and BigQuery to run analyses on BigQuery using BQML and to bring back inferences to MongoDB. Three templates were introduced as part of this release, including the MongoDB to BigQuery CDC (change data capture) template.
This template requires users to run the change stream on MongoDB, which will monitor inserts and updates on the collection. These changes will be captured and pushed to a Pub/Sub topic. The CDC template will create a job to read the data from the topic and get the changes, apply the transformation, and write the changes to BigQuery. The transformations will vary based on the user input while running the Dataflow job.
Alternatively, you can use a native Pub/Sub capability to set up a data pipeline between your MongoDB cluster and BigQuery. The Pub/Sub BigQuery subscription writes messages to an existing BigQuery table as they are received. Without the BigQuery subscription type, you need a pull or push subscription and a subscriber (such as Dataflow) that reads messages and writes them to BigQuery.
This article explains how to set up the BigQuery subscription to process data read from a MongoDB change stream. As a prerequisite, you’ll need a MongoDB Atlas cluster.
To set up a free tier cluster, you can register to MongoDB either from Google Cloud Marketplace or from the registration page. Follow the steps in the MongoDB documentation to configure the database user and network settings for your cluster.
On Google Cloud, we will create a Pub/Sub topic, a BigQuery dataset, and a table before creating the BigQuery subscription.
Then, add a new table in your dataset. Define it with a name of your choice and the following schema:
Field name | Type |
---|---|
id | STRING |
source_data | STRING |
Timestamp | STRING |
Next, we’ll configure a Pub/Sub schema and topic to ingest the messages from our MongoDB change stream. Then, we’ll create a subscription to write the received messages to the BigQuery table we just created.
For this section, we’ll use the Google Cloud Pub/Sub API. Before proceeding, make sure you have enabled the API for your project.
Provide an appropriate identifier, such as “mdb-to-bq-schema,” to your schema. Then, select “Avro” for the type. Finally, add the following definition to match the fields from your BigQuery table:
1 { 2 "type" : "record", 3 "name" : "Avro", 4 "fields" : [ 5 { 6 "name" : "id", 7 "type" : "string" 8 }, 9 { 10 "name" : "source_data", 11 "type" : "string" 12 }, 13 { 14 "name" : "Timestamp", 15 "type" : "string" 16 } 17 ] 18 }
From the sidebar, navigate to “Topics” and click on Create a topic.
Give your topic an identifier, such as “MongoDBCDC.” Enable the Use a schema field and select the schema that you just created. Leave the rest of the parameters to default and click on Create Topic.
From inside the topic, click on Create new subscription. Configure your subscription in the following way:
- Provide a subscription ID — for example, “mdb-cdc.”
- Define the Delivery type to Write to BigQuery.
- Select your BigQuery dataset from the dropdown.
- Provide the name of the table you created in the BigQuery dataset.
- Enable Use topic schema.
You need to have a
bigquery.dataEditor
role on your service account to create a Pub/Sub BigQuery subscription. To grant access using the bq
command line tool, run the following command:1 bq add-iam-policy-binding \ 2 --member="serviceAccount:service<project-number>@gcp-sa-pubsub.iam.gserviceaccount.com" \ 3 --role=roles/bigquery.dataEditor \ 4 -t "<dataset>.<table>"
Keep the other fields as default and click on Create subscription.
Finally, we’ll set up a change stream that listens for new documents inserted in our MongoDB cluster.
We’ll use Node.js but you can adapt the code to a programming language of your choice. Check out the Google Cloud documentation for more Pub/Sub examples using a variety of languages. You can find the source code of this example in the dedicated GitHub repository.
First, set up a new Node.js project and install the following dependencies.
1 npm install mongodb @google-cloud/pubsub avro-js
Then, add an Avro schema, matching the one we created in Google Cloud Pub/Sub:
./document-message.avsc
1 { 2 "type": "record", 3 "name": "DocumentMessage", 4 "fields": [ 5 { 6 "name": "id", 7 "type": "string" 8 }, 9 { 10 "name": "source_data", 11 "type": "string" 12 }, 13 { 14 "name": "Timestamp", 15 "type": "string" 16 } 17 ] 18 }
Then create a new JavaScript module —
index.mjs
. Start by importing the required libraries and setting up your MongoDB connection string and your Pub/Sub topic name. If you don’t already have a MongoDB cluster, you can create one for free in MongoDB Atlas../index.mjs
1 import { MongoClient } from 'mongodb'; 2 import { PubSub } from '@google-cloud/pubsub'; 3 import avro from 'avro-js'; 4 import fs from 'fs'; 5 6 const MONGODB_URI = '<mongodb-connection-string>'; 7 const PUB_SUB_TOPIC = 'projects/<project-name>/topics/<topic-name>';
After this, we can connect to our MongoDB instance and set up a change stream event listener. Using an aggregation pipeline, we’ll watch only for “insert” events on the specified collection. We’ll also define a 60-second timeout before closing the change stream.
./index.mjs
1 let mongodbClient; 2 try { 3 mongodbClient = new MongoClient(MONGODB_URI); 4 await monitorCollectionForInserts(mongodbClient, 'my-database', 'my-collection'); 5 } finally { 6 mongodbClient.close(); 7 } 8 9 async function monitorCollectionForInserts(client, databaseName, collectionName, timeInMs) { 10 const collection = client.db(databaseName).collection(collectionName); 11 // An aggregation pipeline that matches on new documents in the collection. 12 const pipeline = [ { $match: { operationType: 'insert' } } ]; 13 const changeStream = collection.watch(pipeline); 14 15 changeStream.on('change', event => { 16 const document = event.fullDocument; 17 publishDocumentAsMessage(document, PUB_SUB_TOPIC); 18 }); 19 20 await closeChangeStream(timeInMs, changeStream); 21 } 22 23 function closeChangeStream(timeInMs = 60000, changeStream) { 24 return new Promise((resolve) => { 25 setTimeout(() => { 26 console.log('Closing the change stream'); 27 changeStream.close(); 28 resolve(); 29 }, timeInMs) 30 }) 31 };
Finally, we’ll define the
publishDocumentAsMessage()
function that will:- Transform every MongoDB document received through the change stream.
- Convert it to the data buffer following the Avro schema.
- Publish it to the Pub/Sub topic in Google Cloud.
1 async function publishDocumentAsMessage(document, topicName) { 2 const pubSubClient = new PubSub(); 3 const topic = pubSubClient.topic(topicName); 4 5 const definition = fs.readFileSync('./document-message.avsc').toString(); 6 const type = avro.parse(definition); 7 8 const message = { 9 id: document?._id?.toString(), 10 source_data: JSON.stringify(document), 11 Timestamp: new Date().toISOString(), 12 }; 13 14 const dataBuffer = Buffer.from(type.toString(message)); 15 try { 16 const messageId = await topic.publishMessage({ data: dataBuffer }); 17 console.log(`Avro record ${messageId} published.`); 18 } catch(error) { 19 console.error(error); 20 } 21 }
Run the file to start the change stream listener:
1 node ./index.mjs
Insert a new document in your MongoDB collection to watch it go through the data pipeline and appear in your BigQuery table!
There are multiple ways to load the change stream data from MongoDB to BigQuery and we have shown how to use the BigQuery subscription on Pub/Sub. The change streams from MongoDB are monitored, captured, and later written to a Pub/Sub topic using Java libraries.
The data is then written to BigQuery using BigQuery subscription. The datatype for the BigQuery table is set using Pub/Sub schema. Thus, the change stream data can be captured and written to BigQuery using the BigQuery subscription capability of Pub/Sub.
- Setup your first MongoDB cluster using Google Marketplace.
- How to publish a message to a topic with schema.