Spark Up Your MongoDB and BigQuery Using BigQuery Spark Stored Procedures
ZW
Venkatesh Shanbhag, Zi Wang, Maruti C5 min read • Published Aug 12, 2024 • Updated Aug 12, 2024
Rate this tutorial
To empower enterprises that strive to transform their data into insights, BigQuery has emerged as a powerful, scalable, cloud-based data warehouse solution offered by Google Cloud Platform (GCP). Its cloud-based approach allows efficient data management and manipulation, making BigQuery a game-changer for businesses seeking advanced data insights. Notably, one of BigQuery’s standout features is its seamless integration with Spark-based data processing that enables users to further enhance their queries. Now, leveraging BigQuery APIs, users can create and execute Spark stored procedures, which are reusable code modules that can encapsulate complex business logic and data transformations. This feature helps data engineers, data scientists, and data analysts take advantage of BigQuery’s advanced capabilities and Spark’s robust data processing capabilities.
MongoDB, a developer data platform, is a popular choice for storing and managing operational data for its scalability, performance, flexible schema, and real-time capabilities (change streams and aggregation). By combining the capabilities of BigQuery with the versatility of Apache Spark and the flexibility of MongoDB, you can unlock a powerful data processing pipeline.
Apache Spark is a powerful open-source distributed computing framework that excels at processing large amounts of data quickly and efficiently. It supports a wide range of data formats, including structured, semi-structured, and unstructured data, making it an ideal choice for integrating data from various sources, such as MongoDB.
BigQuery Spark stored procedures are routines that are executed within the BigQuery environment. These procedures can perform various tasks, such as data manipulation, complex calculations, and even external data integration. They provide a way to modularize and reuse code, making it easier to maintain and optimize data processing workflows. Spark stored procedures use the serverless Spark engine that enables serverless, autoscaling Spark. However, you don’t need to enable Dataproc APIs or be charged for Dataproc when you leverage this new capability.
Let's explore how to extend BigQuery’s data processing to Apache Spark, and integrate MongoDB with BigQuery to effectively facilitate data movement between the two platforms.
This tutorial guides you through creating a PySpark procedure using the BigQuery editor.
Before we start with the Spark stored procedure setup, you need to upload the MongoDB Spark connector JAR file to Google Cloud Storage to connect and read from MongoDB Atlas. Copy and save the gsutil URI for the JAR file that will be used in upcoming steps.
- As a prerequisite for completing the tutorial, you need to set up a MongoDB Atlas cluster with sample data loaded to it.
- You will type the PySpark code directly into the query editor. To create a PySpark stored procedure, click on Create Pyspark Procedure, and then select Create PySpark Procedure.
- To set options, click More > PySpark Options, and then do the following:
- Specify the location where you want to run the PySpark code.
- In the Connection field, click on CREATE NEW CONNECTION and enter the below values for each field.
- Connection Type > Apache Spark
- Connection id > Name it as mongodb-to-bigquery for the sake of this tutorial.
- Leave the other option empty and click on Create.
- In the Stored procedure invocation section select Set a dataset for stored procedure invocation to spark_run.
- Click on the advanced options. Copy the gsutil URI name copied at the beginning of the setup and paste in the JAR files in the pre-requisite. Leave the other options empty. Press enter and click on Save.
- Open a new tab and go to BigQuery. Navigate to the External connections > Find the mongodb-to-bigquery Connection > Copy the Service account id. Grant BigQuery Storage Admin, Secret Manager Secret Accessor, and Storage Object Admin access to this service account from IAM.
- (Optional) Add your username and password into Google Cloud Secret Manager, or you can hardcode it in the MongoDB URI string itself.
- Copy the below Python script in the PySpark procedure editor and click on RUN. The snippet takes around two to three minutes to complete. The below script will create a new table under dataset spark_run with the name sample_mflix_comments.
1 from pyspark.sql import SparkSession 2 from google.cloud import secretmanager 3 4 def access_secret_version(secret_id, project_id): 5 client = secretmanager.SecretManagerServiceClient() 6 name = f"projects/{project_id}/secrets/{secret_id}/versions/1" 7 response = client.access_secret_version(request={"name": name}) 8 payload = response.payload.data.decode("UTF-8") 9 return payload 10 # Update project_number, username_secret_id and password_secret_id, comment them out if you did not create the secrets earlier 11 12 project_id = "<Your project number, 12 digit number>" 13 username_secret_id = "<Your username secret id>" 14 password_secret_id = "<Your password secret id>" 15 16 username = access_secret_version(username_secret_id, project_id) 17 password = access_secret_version(password_secret_id, project_id) 18 19 # Update the mongodb_uri directly if with your username and password if you did not create a secret from Step 7, update the hostname with your hostname 20 mongodb_uri = "mongodb+srv://"+username+":"+password+"@<hostname>/sample_mflix.comments" 21 22 my_spark = SparkSession \ 23 .builder \ 24 .appName("myApp") \ 25 .config("spark.mongodb.read.connection.uri", mongodb_uri) \ 26 .config("spark.mongodb.write.connection.uri", mongodb_uri) \ 27 .getOrCreate() 28 29 30 dataFrame = my_spark.read.format("mongodb").option("database", "sample_mflix").option("collection", "comments").load() 31 32 dataFrame.show() 33 34 # Saving the data to BigQuery 35 dataFrame.write.format("bigquery") \ 36 .option("writeMethod", "direct") \ 37 .save("spark_run.sample_mflix_comments")
- Navigate to the spark_run dataset to validate the data is loaded from MongoDB Atlas to BigQuery under the table with the name sample_mflix_comments.
- Now that the data is in BigQuery, leverage BQML to run some Generative AI on the new MongoDB data in BigQuery.
- Create a connection with the name gentext-conn, using either the console or bq command line with connection type as CLOUD_RESOURCE.
1 !bq mk \ 2 --connection \ 3 --location=US \ 4 --project_id=<GCP Project id> \ 5 --connection_type=CLOUD_RESOURCE gentext-conn
11. To grant IAM permissions to access Vertex AI from BigQuery, navigate to External connections > Find the gettext-conn connection > Copy the Service account id. Grant the Vertex AI User access to this service account from IAM.
12. Create a model using the CREATE MODEL command.
1 CREATE OR REPLACE MODEL `gcp-pov.spark_run.llm_model` 2 REMOTE WITH CONNECTION `us.gentext-conn` 3 OPTIONS (ENDPOINT = 'gemini-pro');
13. Run the SQL command against the BigQuery table. This query allows the user to extract the host name from the email leveraging the Gemini Pro model. The resulting output includes the response and safety attributes.
1 SELECT prompt,ml_generate_text_result 2 FROM 3 ML.GENERATE_TEXT( MODEL `gcp-pov.spark_run.llm_model`, 4 ( 5 SELECT CONCAT('Extract the host name from the email: ', email) AS prompt, 6 * FROM `gcp-pov.spark_run.sample_mflix_comments` 7 LIMIT 5), 8 STRUCT( 9 0.9 AS temperature, 10 100 AS max_output_tokens 11 ) 12 );
14. Here is the sample output showing the prompt as well as the response. The prompt parameter provides the text for the model to analyze. Prompt design can strongly affect the responses returned by the LLM.
- This integration enables you to use MongoDB for OLTP and BigQuery for OLAP, providing a complete data management solution.
- Once the data is transformed and copied to BigQuery, BigQuery ML lets you create and run machine learning (ML) models by using GoogleSQL queries.
- BigQuery ML also lets you access LLMs and Cloud AI APIs to perform artificial intelligence (AI) tasks like text generation and machine translation.
By combining the power of BigQuery, Spark stored procedures, and MongoDB, you can create a robust and scalable data processing pipeline that leverages the strengths of each technology. BigQuery provides a reliable and scalable data warehouse for storing and analyzing structured data, while Spark allows you to process and transform data from various sources, including semi-structured and unstructured data from MongoDB. Spark stored procedures enable you to encapsulate and reuse this logic, making it easier to maintain and optimize your data processing workflows.
- Get started with MongoDB Atlas on Google Cloud.
- Create machine learning models in BigQuery ML.
Top Comments in Forums
There are no comments on this article yet.