Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Connectors
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Connectorschevron-right

Spark Up Your MongoDB and BigQuery Using BigQuery Spark Stored Procedures

ZW
Venkatesh Shanbhag, Zi Wang, Maruti C5 min read • Published Aug 12, 2024 • Updated Aug 12, 2024
SparkPythonConnectors
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
To empower enterprises that strive to transform their data into insights, BigQuery has emerged as a powerful, scalable, cloud-based data warehouse solution offered by Google Cloud Platform (GCP). Its cloud-based approach allows efficient data management and manipulation, making BigQuery a game-changer for businesses seeking advanced data insights. Notably, one of BigQuery’s standout features is its seamless integration with Spark-based data processing that enables users to further enhance their queries. Now, leveraging BigQuery APIs, users can create and execute Spark stored procedures, which are reusable code modules that can encapsulate complex business logic and data transformations. This feature helps data engineers, data scientists, and data analysts take advantage of BigQuery’s advanced capabilities and Spark’s robust data processing capabilities.
MongoDB, a developer data platform, is a popular choice for storing and managing operational data for its scalability, performance, flexible schema, and real-time capabilities (change streams and aggregation). By combining the capabilities of BigQuery with the versatility of Apache Spark and the flexibility of MongoDB, you can unlock a powerful data processing pipeline.
Apache Spark is a powerful open-source distributed computing framework that excels at processing large amounts of data quickly and efficiently. It supports a wide range of data formats, including structured, semi-structured, and unstructured data, making it an ideal choice for integrating data from various sources, such as MongoDB.
BigQuery Spark stored procedures are routines that are executed within the BigQuery environment. These procedures can perform various tasks, such as data manipulation, complex calculations, and even external data integration. They provide a way to modularize and reuse code, making it easier to maintain and optimize data processing workflows. Spark stored procedures use the serverless Spark engine that enables serverless, autoscaling Spark. However, you don’t need to enable Dataproc APIs or be charged for Dataproc when you leverage this new capability.
Let's explore how to extend BigQuery’s data processing to Apache Spark, and integrate MongoDB with BigQuery to effectively facilitate data movement between the two platforms.

Connecting them together

Data movement between MongoDB and BigQuery using Spark stored procedure running on serverless Spark
This tutorial guides you through creating a PySpark procedure using the BigQuery editor.
Before we start with the Spark stored procedure setup, you need to upload the MongoDB Spark connector JAR file to Google Cloud Storage to connect and read from MongoDB Atlas. Copy and save the gsutil URI for the JAR file that will be used in upcoming steps.
Copy the path for gsutil URI
  1. As a prerequisite for completing the tutorial, you need to set up a MongoDB Atlas cluster with sample data loaded to it.
  2. Navigate to the BigQuery page on the Google Cloud console.
  3. Create a BigQuery dataset with the name spark_run.
  4. You will type the PySpark code directly into the query editor. To create a PySpark stored procedure, click on Create Pyspark Procedure, and then select Create PySpark Procedure.
Creating PySPark procedure from BigQuery explorer
  1. To set options, click More > PySpark Options, and then do the following:
    1. Specify the location where you want to run the PySpark code.
    2. In the Connection field, click on CREATE NEW CONNECTION and enter the below values for each field.
      1. Connection Type > Apache Spark
      2. Connection id > Name it as mongodb-to-bigquery for the sake of this tutorial.
      3. Leave the other option empty and click on Create.
    3. In the Stored procedure invocation section select Set a dataset for stored procedure invocation to spark_run.
    4. Click on the advanced options. Copy the gsutil URI name copied at the beginning of the setup and paste in the JAR files in the pre-requisite. Leave the other options empty. Press enter and click on Save.
  2. Open a new tab and go to BigQuery. Navigate to the External connections > Find the mongodb-to-bigquery Connection > Copy the Service account id. Grant BigQuery Storage Admin, Secret Manager Secret Accessor, and Storage Object Admin access to this service account from IAM.
External connections created using Spark stored procedures
  1. (Optional) Add your username and password into Google Cloud Secret Manager, or you can hardcode it in the MongoDB URI string itself.
  2. Copy the below Python script in the PySpark procedure editor and click on RUN. The snippet takes around two to three minutes to complete. The below script will create a new table under dataset spark_run with the name sample_mflix_comments.
1from pyspark.sql import SparkSession
2from google.cloud import secretmanager
3
4def access_secret_version(secret_id, project_id):
5 client = secretmanager.SecretManagerServiceClient()
6 name = f"projects/{project_id}/secrets/{secret_id}/versions/1"
7 response = client.access_secret_version(request={"name": name})
8 payload = response.payload.data.decode("UTF-8")
9 return payload
10# Update project_number, username_secret_id and password_secret_id, comment them out if you did not create the secrets earlier
11
12project_id = "<Your project number, 12 digit number>"
13username_secret_id = "<Your username secret id>"
14password_secret_id = "<Your password secret id>"
15
16username = access_secret_version(username_secret_id, project_id)
17password = access_secret_version(password_secret_id, project_id)
18
19 # Update the mongodb_uri directly if with your username and password if you did not create a secret from Step 7, update the hostname with your hostname
20mongodb_uri = "mongodb+srv://"+username+":"+password+"@<hostname>/sample_mflix.comments"
21
22my_spark = SparkSession \
23 .builder \
24 .appName("myApp") \
25 .config("spark.mongodb.read.connection.uri", mongodb_uri) \
26 .config("spark.mongodb.write.connection.uri", mongodb_uri) \
27 .getOrCreate()
28
29
30dataFrame = my_spark.read.format("mongodb").option("database", "sample_mflix").option("collection", "comments").load()
31
32dataFrame.show()
33
34# Saving the data to BigQuery
35dataFrame.write.format("bigquery") \
36 .option("writeMethod", "direct") \
37 .save("spark_run.sample_mflix_comments")
Stored procedure on Google BigQuery Explorer
  1. Navigate to the spark_run dataset to validate the data is loaded from MongoDB Atlas to BigQuery under the table with the name sample_mflix_comments.
  2. Now that the data is in BigQuery, leverage BQML to run some Generative AI on the new MongoDB data in BigQuery.
    1. Create a connection with the name gentext-conn, using either the console or bq command line with connection type as CLOUD_RESOURCE.
1!bq mk \
2 --connection \
3 --location=US \
4 --project_id=<GCP Project id> \
5 --connection_type=CLOUD_RESOURCE gentext-conn
11. To grant IAM permissions to access Vertex AI from BigQuery, navigate to External connections > Find the gettext-conn connection > Copy the Service account id. Grant the Vertex AI User access to this service account from IAM. 12. Create a model using the CREATE MODEL command.
1CREATE OR REPLACE MODEL `gcp-pov.spark_run.llm_model`
2REMOTE WITH CONNECTION `us.gentext-conn`
3OPTIONS (ENDPOINT = 'gemini-pro');
13. Run the SQL command against the BigQuery table. This query allows the user to extract the host name from the email leveraging the Gemini Pro model. The resulting output includes the response and safety attributes.
1SELECT prompt,ml_generate_text_result
2FROM
3ML.GENERATE_TEXT( MODEL `gcp-pov.spark_run.llm_model`,
4 (
5 SELECT CONCAT('Extract the host name from the email: ', email) AS prompt,
6 * FROM `gcp-pov.spark_run.sample_mflix_comments`
7 LIMIT 5),
8 STRUCT(
9 0.9 AS temperature,
10 100 AS max_output_tokens
11 )
12 );
14. Here is the sample output showing the prompt as well as the response. The prompt parameter provides the text for the model to analyze. Prompt design can strongly affect the responses returned by the LLM.
Output after running the Spark stored procedure

Benefits of the integration

  1. This integration enables you to use MongoDB for OLTP and BigQuery for OLAP, providing a complete data management solution.
  2. Once the data is transformed and copied to BigQuery, BigQuery ML lets you create and run machine learning (ML) models by using GoogleSQL queries.
  3. BigQuery ML also lets you access LLMs and Cloud AI APIs to perform artificial intelligence (AI) tasks like text generation and machine translation.

Conclusion

By combining the power of BigQuery, Spark stored procedures, and MongoDB, you can create a robust and scalable data processing pipeline that leverages the strengths of each technology. BigQuery provides a reliable and scalable data warehouse for storing and analyzing structured data, while Spark allows you to process and transform data from various sources, including semi-structured and unstructured data from MongoDB. Spark stored procedures enable you to encapsulate and reuse this logic, making it easier to maintain and optimize your data processing workflows.

Further reading

  1. Get started with MongoDB Atlas on Google Cloud.
  2. Work with stored procedures for Apache Spark.
  3. Create machine learning models in BigQuery ML.
Top Comments in Forums
There are no comments on this article yet.
Start the Conversation

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Podcast

MongoDB Podcast Interview With Connectors and Translators Team


Sep 11, 2024 | 16 min
Tutorial

Tuning the MongoDB Connector for Apache Kafka


Sep 17, 2024 | 10 min read
Article

Streaming Data With Apache Spark and MongoDB


Aug 28, 2024 | 7 min read
Tutorial

Deploying the MongoDB Enterprise Kubernetes Operator on Google Cloud


Jan 13, 2023 | 6 min read
Table of Contents
  • Connecting them together