Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Learn why MongoDB was selected as a leader in the 2024 Gartner® Magic Quadrant™
MongoDB Developer
MongoDB
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
MongoDBchevron-right

Building a Dynamic Pricing Microservice with Vertex AI and MongoDB Atlas

Francesco Baldissera, Sebastian Rojas Arbulu18 min read • Published Oct 09, 2024 • Updated Oct 09, 2024
AIMongoDB
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
In the hyper-competitive world of e-commerce, crafting a winning pricing strategy is essential for growth. Thankfully, big data and machine learning have revolutionized pricing. Businesses can now leverage real-time customer behavior and competitor data to dynamically adjust prices.
This tutorial dives into building a responsive dynamic pricing microservice that enables prices to be adjusted in real time for maximum effectiveness. We'll explore using MongoDB Atlas for its efficient data storage and management while leveraging Google Cloud Platform's (GCP) power for complex calculations and hosting. By the end, you'll be equipped to implement this approach and unlock the potential of data-driven pricing.
The following animation illustrates what we aim to achieve:
Gif of fashion illustration
As seen, this e-commerce store displays a dynamically predicted price alongside the actual product price. The predicted price is calculated in real time using machine learning algorithms that analyze market trends, demand, competitor prices, customer behavior, and sales data to optimize sales and profit.

Data model overview

Before we begin, let's establish context with an overview of our data model. Our microservice leverages MongoDB Atlas, a developer data platform, to power real-time AI in our e-commerce app. Atlas stores our ML features in two key collections, acting as a feature store. This streamlines data management, automates decision-making, and isolates workloads. With change streams and triggers, updates flow seamlessly to our AI models, minimizing operational overhead for both business and MLOps.

Products collection

The Product collection in MongoDB Atlas is organized using the polymorphic pattern. The polymorphic pattern is useful when we want to access (query) information from a single collection. Grouping documents together based on the queries we want to run, instead of separating the objects across tables or collections, helps improve performance. By centralizing various product types, this pattern streamlines data management and improves query efficiency. The following table outlines the key fields within the Products collection (not public), which you will use to create a collection in MongoDB Atlas according to this schema. The table includes the data types and brief descriptions of each field:
Field NameData TypeDescription
_idObject IDUnique identifier for the document; used by MongoDB for internal purposes
nameStringName of the product
codeStringUnique code identifying the product
autoreplenishmentBooleanIndicates if the product is set up for auto-replenishment
idIntegerNumeric identifier for the product; numeric identifier for external reference
genderStringGender category the product is intended for
masterCategoryStringBroad category for the product
subCategoryStringMore specific category under the master category
articleTypeStringType of article, e.g., Vest
baseColourStringPrimary color of the product
seasonStringSeason the product is intended for
yearIntegerYear of the product release
usageStringIntended use of the product, e.g., Casual
imageObjectContains the URL of the product image
priceObjectContains the amount and currency of the product price; nested structure
descriptionStringDetailed description of the product
brandStringBrand of the product
itemsArray of ObjectsContains variants of the product, including size, stock information, and delivery time
total_stock_sumArray of ObjectsAggregated stock information across different locations
pred_priceDoublePredicted price of the product based on machine learning models; uses a double data type for precision in pricing predictions
The JSON objects within the collection should appear as follows:
Json objects in collection

Events collection (feature store)

Utilizing MongoDB's Events collection as an ML feature store — a centralized repository designed to streamline the management and delivery of features used in machine learning models — offers numerous advantages. It means that your feature store is always accessible, reducing downtime and improving the efficiency of machine learning operations. On the other hand, cross-region deployments further enhance performance by bringing features closer to the models that use them. This reduces latency, allowing for faster model training and serving.
This collection (not public) stored in MongoDB Atlas serves as a repository for user behavior events crucial for training our pricing prediction model. The key fields you will use to create the collection, according to this schema, are as follows:
FieldData TypeDescriptionExample Values
product_nameStringThe name of the product"MongoDB Notebook"
product_idIntegerUnique identifier for the product98803
actionStringType of action performed on the product (user interaction)"view", "add_to_cart", "purchase"
priceFloatPrice of the product18.99
timestampStringISO format timestamp of when the event occurred"2024-03-25T12:36:25.428461"
encoded_nameIntegerAn encoded version of the product name for machine learning models23363195
tensorArrayA numerical representation of the product extracted through machine learning techniques; the size of the tensor can vary depending on the specific model requirements[0.0005624396083488047, -0.9579731008383453]
An Events object, with its associated tensors, should look like: Events object

Demo components

Additionally, it's important to consider that our solution incorporates various components to facilitate dynamic pricing:
  • Data ingestion: Pub/Sub acts as a high-speed pipeline, efficiently bringing in large amounts of customer behavior data formatted as JSON.
  • Data processing: Vertex AI Workbench provides a clean environment for data cleaning and training TensorFlow models. These models analyze customer events, product names, and existing prices to predict the optimal price for each item.
  • Feature storage: MongoDB Atlas serves as a central hub for all the features used by the model. This ensures consistency between the data used for training and the data used for real-time predictions, as well as the operational data for your applications, thereby reducing the overhead of “in-app analytics.” It also simplifies the overall process by keeping everything in one place.
  • Model orchestration: Cloud Functions act like a conductor, directing the flow of customer event data. They take the data from Pub/Sub, transform it into a format usable by the model (tensors), and store it in MongoDB Atlas. This allows the model to easily access the information it needs.

Architecture overview

The architecture is designed to enhance pricing strategies through deep learning and continuous model optimization.

Blue data flow: Real-time pricing adjustment

  • Event ingestion: Customer event data is ingested into a Google Cloud Pub/Sub topic, serving as the entry point for real-time data.
  • Data processing: A Cloud function is triggered via a push subscription from the Pub/Sub topic. This function transforms raw event data into a structured tensor format.
  • Model invocation and price update: The same Cloud function calls a deployed model endpoint (e.g., in Vertex AI) with the tensor data to predict pricing. It then updates the predicted price in the MongoDB product catalog collection.
Figure 1. Dynamic pricing architecture integrating different Google Cloud components and MongoDB Atlas as a feature store

Green data flow: Feature store building

  • Feature store update: Concurrently, the Cloud function pushes the tensor data into the MongoDB Events collection, which acts as a feature store. Every event has its own tensor.
  • Versioning and accessibility: The data within the feature store is not versioned at the moment. The versioning pattern is useful for a future store because it addresses the problem of wanting to keep around older revisions of data in MongoDB, avoiding the need for a separate management system.
Important: Make sure to check out the pattern versioning guide. Versioning in a feature store enhances reproducibility, traceability, collaboration, and compliance in MLOps workflows, making it an essential component for managing machine learning pipelines effectively.

Prerequisites

Let's get started building your AI pricing microservice! To seamlessly integrate AI pricing into your application, you'll need to set up the following components:
  1. MongoDB Atlas account: Set up a cluster, configure security settings, and connect your application.
  2. Google Cloud Platform account: Create a project, enable necessary APIs (e.g., Cloud Storage, Cloud Function, Pub/Sub, Vertex AI), and configure the CLI.
  3. Install Node.js and Express: Clone the repository containing the microservice code for this tutorial, configure environment variables, and develop the pricing logic.

Initial configuration

Step 1: Setting up MongoDB Atlas

Tip: Make sure to follow the how-to guide.
  • Create a cluster: Sign in to your MongoDB Atlas account and create a new cluster. Choose a region that is closest to your user base for optimal performance.
  • Configure security: Set up your cluster's security settings. Create database users with specific roles and enable IP whitelisting to secure your database connection.
  • Connect to your cluster: Use the connection string provided by Atlas to connect your application to your MongoDB database. You'll need this in the microservice configuration.

Step 2: Setting up GCP

Tip: View our guide to configure your GCP project.
  • Create a GCP project: Log into your Google Cloud console and create a new project for your microservice.
  • Enable APIs: Ensure that the necessary APIs are enabled for your project. In this microservice, we are using the services below.
Tip: Verify in “IAM & Admin” that you possess all necessary permissions. Ensure you have either owner privileges or granular access for specific users (with less permissive rules). Also, follow this guide to enable any API and service in GCP.
assign roles
ServicesExplanation
Cloud StorageSaving data scalers as .joblib files
Cloud FunctionOrchestrating the data flow
Pub/SubIngesting live streaming of e-commerce events to loosely couple subscription-based microservices
VertexAITraining notebook and model endpoint
Set up with Images

:

Cloud Storage:
  1. Search in the Google Cloud search bar for “Cloud Storage” google cloud storage
  2. You should see the following screen indicating that the service is activated. If you're prompted to activate the service, please do so. cloud storage
Cloud Run Functions:
  1. Search in the Google Cloud search bar for “Cloud Run functions” cloud run functions
  2. You should see the following screen indicating that the service is activated. If you're prompted to activate the service, please do so. cloud run functions
Pub/Sub:
  1. Search in the Google Cloud search bar for “Pub/Sub” pubsub
  2. You should see the following screen indicating that the service is activated. If you're prompted to activate the service, please do so. pubsub
VertexAI:
  1. Search in the Google Cloud search bar for “VertexAI” vertexai
  2. You should see the following screen indicating that the service is activated. If you're prompted to activate the service, please do so. vertexai
  • Configure GCP CLI: Install and initialize the Google Cloud CLI. Authenticate with your GCP account and set your project as the default.

Step 3: Develop the microservice and model

  • Clone the repository: Start by cloning the repository with the microservice code.
Open your terminal and run the following commands:
1git clone https://github.com/mongodb-industry-solutions/retail-store.git
Then, navigate to the directory containing the dynamic pricing microservice:
1cd retail-store/microservices/dynamicPricing
  • Configure Python packages: Once you're in the correct directory, type the following command and press Enter:
1pip install -r requirements.txt
  • Configure environment variables: Set up the necessary environment variables, including your MongoDB Atlas connection string and any other service-specific configurations. Environment variables are essential for managing configuration settings, especially those that contain sensitive information.
Here's a template illustrating how to set up environment variables for a microservice that connects to MongoDB Atlas:
Creating the .env File with the following content:
1MONGODB_URI=mongodb+srv://username:password@clusterName.mongodb.net/
2GOOGLE_APPLICATION_CREDENTIALS=my-google-credentials
3GOOGLE_CLOUD_PROJECT=my-google-cloud
4PUBSUB_TOPIC_ID=my-topic-id
Tip: Replace username and password with your MongoDB username and password, clusterName with the name of your MongoDB cluster, my-google-credentials with the path to your Google application credentials file, my-google-cloud with your Google Cloud project ID, and my-topic-id with the ID of your Google Cloud Pub/Sub topic.
Configure the Pub/Sub topic:
  1. Navigate to Pub/Sub in your Google Cloud Platform project.
  2. Provide a unique name for your topic in the "Topic ID" field.
  3. Adjust topic settings
  4. Select the encryption method you prefer
  5. Finalize the process by clicking the Create button Google Cloud create topic
Develop the pricing logic: Modify the dynamicPricing service to implement your pricing algorithm. This could involve analyzing historical data, considering competitor pricing, and integrating real-time supply-demand signals.
  1. Use VertexAI to navigate to Colab Enterprise.
  2. Click on +Create to make a new notebook.
Vertex AI image
Google Cloud Colab Enterprise picture
Connect directly to your MongoDB cluster for live data analysis and algorithm training: Once you create a new notebook, you can use it to connect to your MongoDB cluster directly using the following snippets:
1conn_string = "your_mongodb_connection_string_here"
2client = MongoClient(conn_string)
1db = client["your_database_name"]
2collection = db["your_collection_name"]
Tip: Replace "your_mongodb_connection_string_here" with your actual MongoDB connection string. This typically follows a format similar to: mongodb+srv://<username>:<password>@<cluster-address>/<database-name>?retryWrites=true&w=majority Be sure to substitute: <username> and <password> with your MongoDB credentials. <cluster-address> with the address of your MongoDB cluster. <database-name> (optional) with the specific database you want to connect to within your cluster. Additionally, replace "your_database_name" and "your_collection_name" with the actual names you are using in your MongoDB setup.
This will allow you to pull data from your clusters live and train a pricing algorithm. In this case, we used TensorFlow to capture how prices change based on user behavior.
Train a TensorFlow neural network model: Now that we're connected to MongoDB, we'll show you a Jupyter Notebook designed for an e-commerce store, similar to the one in the introduction. Feel free to modify it for your specific needs. This notebook demonstrates how to train a TensorFlow neural network model to predict optimal prices based on e-commerce events stored in a MongoDB Atlas feature store. Let's get started.
We’ve decided that the e-commerce store has the following data model for capturing user behavior events:
FieldData TypeDescriptionExample Values
product_nameStringThe name of the product"MongoDB Notebook"
product_idIntegerUnique identifier for the product98803
actionStringType of action performed on the product (user interaction)"view", "add_to_cart", "purchase"
priceFloatPrice of the product18.99
timestampStringISO format timestamp of when the event occurred"2024-03-25T12:36:25.428461"
encoded_nameIntegerAn encoded version of the product name for machine learning models23363195
This table assumes the product["price"] field is a float representing the price of the product in a single currency (e.g., USD). The encoded_name field is considered an integer, which could represent a hash or an encoding used to transform the product name into a numerical format suitable for machine learning models. The timestamp field is a string formatted as an ISO timestamp, which provides the exact date and time when the action was recorded. The example values are placeholders and should be replaced with actual data from your application.
Setting up a MongoDB connection with Python: First, we need to install the necessary Python packages and establish a connection to our MongoDB database.
1```python
2!pip install pymongo
3!pip install 'pymongo[srv]'
4!pip install pandas
5from pymongo import MongoClient
6import pandas as pd
7import keras
1# Replace the below connection string with your MongoDB connection URI
2conn_string = "your_mongodb_connection_string_here"
3client = MongoClient(conn_string)
1# Specify the database and collection
2db = client["your_database_name"]
3collection = db["your_collection_name"]
Data cleaning: Once connected, we'll fetch the data and perform some basic cleaning operations to prepare it for model training.
1# Get all the documents
2documents = collection.find()
3
4# Convert the documents into a list and then into a DataFrame
5df = pd.DataFrame(list(documents))
6
7# Drop unnecessary columns
8df = df.drop(columns=['product_name', 'product_id', 'timestamp', 'tensor'])
9
10# Extracting the 'amount' from the 'price' column and converting it to float
11df['price'] = df['price'].apply(lambda x: float(x['amount']) if isinstance(x, dict) and 'amount' in x else None)
12
13df = df.dropna()
Building the dynamic pricing model: Next, we import the necessary TensorFlow and scikit-learn libraries, encode categorical variables, and normalize our data.
1import tensorflow as tf
2from sklearn.model_selection import train_test_split
3from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler
4from tensorflow.keras.models import Sequential
5from tensorflow.keras.layers import Dense
6from tensorflow.keras.layers import Dropout
1# Encode categorical variables
2label_encoders = {}
3for column in ['action', 'encoded_name']:
4 le = LabelEncoder()
5 df[column] = le.fit_transform(df[column])
6 label_encoders[column] = le
7
8df.head()
1# Standardizing
2scaler = StandardScaler()
3df[['action', 'encoded_name']] = scaler.fit_transform(df[['action', 'encoded_name']])
4
5df.head()
Saving the encoder for event data pre-processing: We'll save the encoder objects to Google Cloud Storage for later use in preprocessing new data for predictions. This code will generate joblib files to save the encoding and standardizing criteria from the above preprocessing and upcoming training.
1!pip install google-cloud-storage
2from google.cloud import storage
3import joblib
4import io
1# Initialize a client
2storage_client = storage.Client()
3
4# The name of your GCP bucket
5bucket_name = 'dyn_pricing_scaler'
6
7# The path within your bucket to save the scaler object
8destination_blob_name = 'labelEncoder.joblib'
9
10# Create a buffer
11buffer = io.BytesIO()
12
13# Dump the scaler object to the buffer
14joblib.dump(label_encoders, buffer)
15
16# Now upload the buffer content to GCS
17bucket = storage_client.bucket(bucket_name)
18blob = bucket.blob(destination_blob_name)
19
20# Rewind the buffer's file pointer to the beginning of the file
21buffer.seek(0)
22
23# Upload the contents of the buffer
24blob.upload_from_file(buffer, content_type='application/octet-stream')
25
26print(f"Uploaded scaler to gs://{bucket_name}/{destination_blob_name}")
Training the model: With our data prepared, we'll split it into training and testing sets, define our neural network architecture, and train our model. Please remember this is a model meant for a simple demo.
1from tensorflow.keras.models import Sequential
2from tensorflow.keras.layers import Dense
3from tensorflow.keras.optimizers import Adam
4
5# Splitting data into training and testing sets
6X = df.drop('price', axis=1)
7y = df['price']
8X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
9
10# Define the model
11model = Sequential([
12 Dense(64, activation='relu', input_shape=(X_train.shape[1],)),
13 Dense(64, activation='relu'),
14 Dense(1) # Output layer for regression
15])
16
17# Compile the model
18model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
19
20# Print the model summary to check the architecture
21model.summary()
22
23# Train the model
24history = model.fit(X_train, y_train,
25 validation_split=0.2, # Further split the training set for validation
26 epochs=10, # Number of epochs to train for
27 verbose=1, # Show training output
28 )
29
30# Evaluate the model on the test set
31test_loss, test_mae = model.evaluate(X_test, y_test, verbose=1)
32print(f"Test Loss: {test_loss}, Test MAE: {test_mae}")
Test prediction: After training, we make a test prediction to verify the model's performance.
1import numpy as np
2
3# Example new data point converted into a tensor
4new_data = np.array([[1.225999, -0.957973]])
5
6predicted_price = model.predict(new_data)
7print("Predicted Price:", predicted_price[0])
Saving the model: Finally, we'll save our trained model to Google Cloud Storage.
1from google.colab import auth
2auth.authenticate_user()
3
4project_id = 'your-gcp-project-id'
5!gcloud config set project {project_id}
6
7model_dir = 'your-model-directory'
8
9# Save the model to GCS
10model_dir = f'gs://{bucket_name}/{model_dir}
11model.save(model_dir)
12
13bucket_name = 'your-cloud-storage-bucket-name'
14model_dir = 'your-model-directory'
15model_path = 'your-model-path'
16!gsutil mb -l us-central1 gs://{bucket_name} # Create the bucket if necessary
17!gsutil cp -r {model_dir} gs://{bucket_name}/{model_path}
Registering the model in Vertex AI: Next, we'll register our trained model in the VertexAI model registry:
1from google.cloud import aiplatform
2
3aiplatform.init(project='your-gcp-project-id', location='your-gcp-region')
4
5#Model registry
6
7model_display_name = 'dyn_pricingv1'
8model_description = 'TensorFlow dynamic pricing model'
9bucket_name = 'your-gcp-bucket-name'
10model_path = 'your-model-path'
11
12model = aiplatform.Model.upload(
13 display_name=model_display_name,
14 artifact_uri=f'gs://{bucket_name}/{model_path}',
15 serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest',
16 description=model_description,
17)
Congratulations! You should now see your model listed in the Vertex AI Model Registry. This is where you'll manage and deploy your models for various applications. Now, we need to train a TensorFlow neural network model to predict an optimal price based on e-commerce events stored in a MongoDB Atlas feature store.
Model registry pic

Step 4: Deploy a model to an endpoint

You must deploy a model to an endpoint before that model can be used to serve online predictions. Deploying a model associates physical resources with the model so it can serve online predictions with low latency. Here are the steps needed:
  1. In the Google Cloud console, in the Vertex AI section, go to the Models page.
  2. Click the name and version ID of the model you want to deploy to open its details page (model from last step).
  3. Select the Deploy & Test tab.
  4. Click Deploy to endpoint.
  5. Fill out the rest of the parameters (Model Settings, Model Monitoring).
  6. Click on Deploy.
deploy to endpoint
Next, in the Vertex AI panel:
  1. Click on Endpoints and then select your deployed model.
  2. Get the Endpoint ID from the details page, as you will need it to configure the Cloud Function for sending prediction requests to this endpoint. online prediction image
cloudFunction configuration: Google CloudFunction will orchestrate converting events data into tensors and its input into the feature store collection, as well as invoking the VertexAI endpoint model. Follow these steps:
Browse to Cloud Functions in your GCP project and click on CREATE FUNCTION.
cloud functions
Make sure the trigger for your Cloud function is the previously created Pub/Sub topic.
In the main.py file seen below, copy and paste the following Python code snippet:
1from pymongo import MongoClient
2from datetime import datetime, timedelta
3import json
4import numpy as np
5import pandas as pd
6import joblib
7import os
8from google.cloud import storage
9import functions_framework
10import base64
11from bson import ObjectId
12from google.cloud import aiplatform
13
14@functions_framework.cloud_event
15def hello_pubsub(cloud_event):
16 event_data = base64.b64decode(cloud_event.data["message"]["data"])
17 print(event_data)
18 event_data = json.loads(event_data)
19 print(event_data)
20 event_data = pd.DataFrame([event_data])
21 print(event_data)
22
23# Correctly handle the '_id' field when it's a string, getting the event_id and product_id data
24 event_id = ObjectId(str(event_data['_id'].loc[0]))
25 print(event_id)
26 product_id = event_data['product_id'].loc[0]
27 print(product_id)
28
29 # Set your GCS bucket and file path
30 bucket_name = 'dyn_pricing_scaler'
31 scaler_file_name = 'scaler.joblib'
32 label_encoder_name = 'labelEncoder.joblib'
33
34 # Initialize a GCS client
35 storage_client = storage.Client()
36
37 # Get the bucket
38 bucket = storage_client.bucket(bucket_name)
39
40 # Get the blob (file) containing the scaler
41 blob1 = bucket.blob(scaler_file_name)
42
43 # Download the scaler file to a temporary location
44 scaler_temp_file_path = '/tmp/' + scaler_file_name
45 blob1.download_to_filename(scaler_temp_file_path)
46
47 # Download the label encoder to a temporary location
48 blob2 = bucket.blob(label_encoder_name)
49 label_encoder_temp = '/tmp/' + label_encoder_name
50 blob2.download_to_filename(label_encoder_temp)
51
52 # Clean unnecessary columns
53 if '_id' in event_data.columns:
54 event_data = event_data.drop(columns=['_id'])
55 print(event_data)
56 event_data.drop('price', axis=1, inplace=True)
57 event_data.drop('timestamp', axis=1, inplace=True)
58 print(event_data)
59 event_data.drop('product_id', axis=1, inplace=True)
60 print(event_data)
61 event_data.drop('product_name', axis=1, inplace=True)
62 print(event_data)
63
64 #Load label encoder
65 label_encoders = joblib.load(label_encoder_temp)
66 print(label_encoders)
67
68 #Encode categorical fields
69 for column, encoder in label_encoders.items():
70 event_data[column] = encoder.transform(event_data[column])
71 print(event_data)
72
73
74 # Load the scaler using joblib
75 scaler = joblib.load(scaler_temp_file_path)
76 print(scaler)
77
78
79 # Use scaler
80 columns_to_scale = ['action', 'encoded_name']
81 event_data[columns_to_scale] = scaler.transform(event_data[columns_to_scale])
82 print(event_data)
83
84 #Prepare data as tensors
85 #first_row_scaled = event_data[0, :].reshape(1, -1)
86 #print(first_row_scaled)
87 event_data = event_data.to_numpy()
88
89 # Prepare input data for VertexAI
90 input_data = {"instances": event_data}
91 print(input_data)
92 input_data['instances'] = input_data['instances'].tolist()
93 print(input_data)
94 input_data_json = json.dumps(input_data)
95 print(input_data_json)
96
97 # Call VertexAI endpoint
98 endpoint_id = "your-endpoint-id"
99 project_id = "your-project-id"
100 location = "your-project-location"
101 endpoint_url = f"https://us-central1-aiplatform.googleapis.com/v1/projects/{project_id}/locations/us-central1/endpoints/{endpoint_id}:predict"
102
103 aiplatform.init(project=project_id, location=location)
104
105 endpoint = aiplatform.Endpoint(endpoint_id)
106
107 prediction = endpoint.predict(instances=input_data['instances'])
108
109 print(prediction)
110
111 pred_price = prediction.predictions[0][0]
112
113 print(pred_price)
114
115 #Start MongoClient
116 mongo_uri = "your-mongo-db-connection-uri" #remember you can set it up in a .env file
117 client = MongoClient(mongo_uri)
118
119 db = client["dotLocalStore"] # Replace "your_database" with your actual database name
120 collection = db["products"]
121 feature_store = db["events"]
122
123 # Update products collection document with 'pred_price'
124 collection.update_one({"id": int(product_id)}, {"$set": {"pred_price": pred_price}})
125
126 # Update events collection (feature_store) with event tensor
127 tensor = event_data.tolist()
128 print(tensor)
129 feature_store.update_one({"_id": event_id}, {"$set": {"tensor": tensor }})
130
131 # Clean up: Delete the temporary files
132 os.remove(scaler_temp_file_path)
133 os.remove(label_encoder_temp)
134 print(pred_price)
135
136 return pred_price
Make sure you add the requirements.txt seen below in the Cloud Function folder structure over GCP:
1functions-framework==3.*
2pymongo
3scikit-learn==1.2.2
4numpy
5google-cloud-aiplatform
6google-auth
7google-cloud-storage
8pandas
9joblib
Simulating customer events: If you're looking to mimic customer behavior, feel free to use the Python script called generator.py seen below.
1import random
2import pymongo
3from pymongo import MongoClient
4import os
5import time
6from google.cloud import pubsub_v1
7from dotenv import load_dotenv
8from faker import Faker
9from datetime import datetime, timedelta
10import json
11from bson import ObjectId
12import hashlib
13
14# Load environment variables
15load_dotenv()
16MONGODB_URI = os.getenv('MONGODB_URI')
17GOOGLE_APPLICATION_CREDENTIALS = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
18GOOGLE_CLOUD_PROJECT = os.getenv('GOOGLE_CLOUD_PROJECT')
19PUBSUB_TOPIC_ID = os.getenv('PUBSUB_TOPIC_ID')
20
21# Initialize MongoDB client
22mongo_client = pymongo.MongoClient(MONGODB_URI)
23db = mongo_client["dotLocalStore"]
24behaviors_collection = db["events"]
25products_collection = db["products"]
26
27# Initialize Google Cloud Pub/Sub publisher
28publisher = pubsub_v1.PublisherClient()
29topic_path = publisher.topic_path(GOOGLE_CLOUD_PROJECT, PUBSUB_TOPIC_ID)
30
31def fetch_products():
32 """Fetch products from MongoDB"""
33 return list(products_collection.find({}))
34
35def generate_ecommerce_behavior(product):
36 """Generate a single synthetic ecommerce behavior data for a given product"""
37 # Encode product_name into a numerical field
38 encoded_name = int(hashlib.sha256(product["name"].encode('utf-8')).hexdigest(), 16) % 10**8
39 behavior = {
40 "product_name": product["name"],
41 "product_id": product["id"],
42 "action": random.choice(["view", "add_to_cart", "purchase"]),
43 "price": product["price"],
44 "timestamp": datetime.now().isoformat(), # Format timestamp for JSON serialization
45 "encoded_name": encoded_name
46 }
47 return behavior
48
49# Custom JSON Encoder that converts ObjectId to str
50class JSONEncoder(json.JSONEncoder):
51 def default(self, o):
52 if isinstance(o, ObjectId):
53 return str(o)
54 return json.JSONEncoder.default(self, o)
55
56def push_event_to_mongodb(behavior):
57 """Push a single ecommerce behavior data to MongoDB"""
58 behaviors_collection.insert_one(behavior)
59 print("Pushed an event to MongoDB.")
60
61def push_event_to_pubsub(event):
62 """Push a single event to Google Cloud Pub/Sub"""
63 try:
64 # Attempt to serialize the event to JSON
65 data = json.dumps(event, cls=JSONEncoder).encode("utf-8")
66 # Attempt to publish the serialized data to Pub/Sub
67 future = publisher.publish(topic_path, data=data)
68 print(f"Published id:{event['product_id']} product:{event['product_name']} to Pub/Sub.")
69 future.result() # Block until the publish completes
70 except Exception as e:
71 print(f"An error occurred: {e}")
72
73if __name__ == "__main__":
74 products = fetch_products()
75 num_behaviors_per_cycle = 150
76
77 try:
78 while True:
79 for _ in range(num_behaviors_per_cycle):
80 # Select a random product
81 selected_product = random.choice(products)
82 # Generate behavior for the selected product
83 behavior = generate_ecommerce_behavior(selected_product)
84 # Push the behavior to MongoDB
85 push_event_to_mongodb(behavior)
86 # Push the behavior to Pub/Sub
87 push_event_to_pubsub(behavior)
88 # Wait for 3 seconds before generating the next behavior
89 time.sleep(3)
90 except KeyboardInterrupt:
91 print("Stopped by the user.")
The python script generates fake customer events based on the explained data model. These events will be pushed to a Pub/Sub topic and your Atlas feature store collection. You can adjust the number of events and their cadence directly within the code. To run this script, use the following command:
1python3 generator.py
After running this script, you should be able to see fake customer events being pushed into your MongoDB Atlas cluster and your Pub/Sub topic, effectively triggering the microservice to respond to those events and calculate optimal pricing points for the different products.

Key takeaways

Have you mastered building a reactive dynamic pricing microservice? Great job! Here's what you learned:
  • Centralized feature store: MongoDB serves as a feature store, acting as a centralized repository specifically designed for storing, managing, and serving features for machine learning (ML) models. Its polymorphic capabilities enable the utilization of a single interface to represent various types of data. This implies that as new features are introduced or pricing models evolve, MongoDB can adeptly manage diverse data types within the same system. In the context of dynamic pricing, this capability facilitates the seamless incorporation of new pricing factors or variables without causing disruptions to existing data structures or operations.
  • Scalability and efficiency: Google Cloud Pub/Sub can handle massive volumes of customer data efficiently, ensuring scalability for real-world applications. While this microservice simulates only 25 customer events every three seconds, Pub/Sub is capable of processing much larger data streams.
  • Real-time price updates: Cloud functions trigger TensorFlow models to generate dynamic prices based on customer behavior. These generated prices are then inserted or updated (upserted) back into the product catalog collection in MongoDB. This enables real-time adjustments in the e-commerce application because the application's front end retrieves data directly from the same collection.
Curious how MongoDB is changing the retail landscape? Dive deeper into MongoDB's capabilities and discover how it's revolutionizing the industry:
MongoDB helps retailers innovate and gain a competitive edge. Apply for an innovation workshop to explore the possibilities with our experts. If you’d like to connect with other people using MongoDB to build their next big project, head to the MongoDB Developer Community.

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Article

Using MongoDB Change Streams in Java


Aug 28, 2024 | 6 min read
Article

Java vs Kotlin: Different Syntax, Same Possibilities


Nov 25, 2024 | 5 min read
Tutorial

Optimize and Tune MongoDB Performance with Hidden Indexes


Oct 01, 2024 | 5 min read
Tutorial

Real Time Data in a React JavaScript Front-End with Change Streams


Sep 09, 2024 | 6 min read
Table of Contents