Building a Dynamic Pricing Microservice with Vertex AI and MongoDB Atlas
Francesco Baldissera, Sebastian Rojas Arbulu18 min read • Published Oct 09, 2024 • Updated Oct 09, 2024
Rate this tutorial
In the hyper-competitive world of e-commerce, crafting a winning pricing strategy is essential for growth. Thankfully, big data and machine learning have revolutionized pricing. Businesses can now leverage real-time customer behavior and competitor data to dynamically adjust prices.
This tutorial dives into building a responsive dynamic pricing microservice that enables prices to be adjusted in real time for maximum effectiveness. We'll explore using MongoDB Atlas for its efficient data storage and management while leveraging Google Cloud Platform's (GCP) power for complex calculations and hosting. By the end, you'll be equipped to implement this approach and unlock the potential of data-driven pricing.
The following animation illustrates what we aim to achieve:
As seen, this e-commerce store displays a dynamically predicted price alongside the actual product price. The predicted price is calculated in real time using machine learning algorithms that analyze market trends, demand, competitor prices, customer behavior, and sales data to optimize sales and profit.
Before we begin, let's establish context with an overview of our data model. Our microservice leverages MongoDB Atlas, a developer data platform, to power real-time AI in our e-commerce app. Atlas stores our ML features in two key collections, acting as a feature store. This streamlines data management, automates decision-making, and isolates workloads. With change streams and triggers, updates flow seamlessly to our AI models, minimizing operational overhead for both business and MLOps.
The Product collection in MongoDB Atlas is organized using the polymorphic pattern. The polymorphic pattern is useful when we want to access (query) information from a single collection. Grouping documents together based on the queries we want to run, instead of separating the objects across tables or collections, helps improve performance. By centralizing various product types, this pattern streamlines data management and improves query efficiency.
The following table outlines the key fields within the Products collection (not public), which you will use to create a collection in MongoDB Atlas according to this schema. The table includes the data types and brief descriptions of each field:
Field Name | Data Type | Description |
---|---|---|
_id | Object ID | Unique identifier for the document; used by MongoDB for internal purposes |
name | String | Name of the product |
code | String | Unique code identifying the product |
autoreplenishment | Boolean | Indicates if the product is set up for auto-replenishment |
id | Integer | Numeric identifier for the product; numeric identifier for external reference |
gender | String | Gender category the product is intended for |
masterCategory | String | Broad category for the product |
subCategory | String | More specific category under the master category |
articleType | String | Type of article, e.g., Vest |
baseColour | String | Primary color of the product |
season | String | Season the product is intended for |
year | Integer | Year of the product release |
usage | String | Intended use of the product, e.g., Casual |
image | Object | Contains the URL of the product image |
price | Object | Contains the amount and currency of the product price; nested structure |
description | String | Detailed description of the product |
brand | String | Brand of the product |
items | Array of Objects | Contains variants of the product, including size, stock information, and delivery time |
total_stock_sum | Array of Objects | Aggregated stock information across different locations |
pred_price | Double | Predicted price of the product based on machine learning models; uses a double data type for precision in pricing predictions |
The JSON objects within the collection should appear as follows:
Utilizing MongoDB's Events collection as an ML feature store — a centralized repository designed to streamline the management and delivery of features used in machine learning models — offers numerous advantages. It means that your feature store is always accessible, reducing downtime and improving the efficiency of machine learning operations. On the other hand, cross-region deployments further enhance performance by bringing features closer to the models that use them. This reduces latency, allowing for faster model training and serving.
This collection (not public) stored in MongoDB Atlas serves as a repository for user behavior events crucial for training our pricing prediction model. The key fields you will use to create the collection, according to this schema, are as follows:
Field | Data Type | Description | Example Values |
---|---|---|---|
product_name | String | The name of the product | "MongoDB Notebook" |
product_id | Integer | Unique identifier for the product | 98803 |
action | String | Type of action performed on the product (user interaction) | "view", "add_to_cart", "purchase" |
price | Float | Price of the product | 18.99 |
timestamp | String | ISO format timestamp of when the event occurred | "2024-03-25T12:36:25.428461" |
encoded_name | Integer | An encoded version of the product name for machine learning models | 23363195 |
tensor | Array | A numerical representation of the product extracted through machine learning techniques; the size of the tensor can vary depending on the specific model requirements | [0.0005624396083488047, -0.9579731008383453] |
An Events object, with its associated tensors, should look like:
Additionally, it's important to consider that our solution incorporates various components to facilitate dynamic pricing:
- Data ingestion: Pub/Sub acts as a high-speed pipeline, efficiently bringing in large amounts of customer behavior data formatted as JSON.
- Data processing: Vertex AI Workbench provides a clean environment for data cleaning and training TensorFlow models. These models analyze customer events, product names, and existing prices to predict the optimal price for each item.
- Feature storage: MongoDB Atlas serves as a central hub for all the features used by the model. This ensures consistency between the data used for training and the data used for real-time predictions, as well as the operational data for your applications, thereby reducing the overhead of “in-app analytics.” It also simplifies the overall process by keeping everything in one place.
- Model orchestration: Cloud Functions act like a conductor, directing the flow of customer event data. They take the data from Pub/Sub, transform it into a format usable by the model (tensors), and store it in MongoDB Atlas. This allows the model to easily access the information it needs.
The architecture is designed to enhance pricing strategies through deep learning and continuous model optimization.
- Event ingestion: Customer event data is ingested into a Google Cloud Pub/Sub topic, serving as the entry point for real-time data.
- Data processing: A Cloud function is triggered via a push subscription from the Pub/Sub topic. This function transforms raw event data into a structured tensor format.
- Model invocation and price update: The same Cloud function calls a deployed model endpoint (e.g., in Vertex AI) with the tensor data to predict pricing. It then updates the predicted price in the MongoDB product catalog collection.
- Feature store update: Concurrently, the Cloud function pushes the tensor data into the MongoDB Events collection, which acts as a feature store. Every event has its own tensor.
- Versioning and accessibility: The data within the feature store is not versioned at the moment. The versioning pattern is useful for a future store because it addresses the problem of wanting to keep around older revisions of data in MongoDB, avoiding the need for a separate management system.
Important: Make sure to check out the pattern versioning guide. Versioning in a feature store enhances reproducibility, traceability, collaboration, and compliance in MLOps workflows, making it an essential component for managing machine learning pipelines effectively.
Let's get started building your AI pricing microservice! To seamlessly integrate AI pricing into your application, you'll need to set up the following components:
- Google Cloud Platform account: Create a project, enable necessary APIs (e.g., Cloud Storage, Cloud Function, Pub/Sub, Vertex AI), and configure the CLI.
- Install Node.js and Express: Clone the repository containing the microservice code for this tutorial, configure environment variables, and develop the pricing logic.
- Create a cluster: Sign in to your MongoDB Atlas account and create a new cluster. Choose a region that is closest to your user base for optimal performance.
- Configure security: Set up your cluster's security settings. Create database users with specific roles and enable IP whitelisting to secure your database connection.
- Connect to your cluster: Use the connection string provided by Atlas to connect your application to your MongoDB database. You'll need this in the microservice configuration.
- Create a GCP project: Log into your Google Cloud console and create a new project for your microservice.
- Enable APIs: Ensure that the necessary APIs are enabled for your project. In this microservice, we are using the services below.
Tip: Verify in “IAM & Admin” that you possess all necessary permissions. Ensure you have either owner privileges or granular access for specific users (with less permissive rules). Also, follow this guide to enable any API and service in GCP.
Services | Explanation |
---|---|
Cloud Storage | Saving data scalers as .joblib files |
Cloud Function | Orchestrating the data flow |
Pub/Sub | Ingesting live streaming of e-commerce events to loosely couple subscription-based microservices |
VertexAI | Training notebook and model endpoint |
Cloud Storage:
- Search in the Google Cloud search bar for “Cloud Storage”
- You should see the following screen indicating that the service is activated. If you're prompted to activate the service, please do so.
Cloud Run Functions:
- Search in the Google Cloud search bar for “Cloud Run functions”
- You should see the following screen indicating that the service is activated. If you're prompted to activate the service, please do so.
Pub/Sub:
- Search in the Google Cloud search bar for “Pub/Sub”
- You should see the following screen indicating that the service is activated. If you're prompted to activate the service, please do so.
VertexAI:
- Search in the Google Cloud search bar for “VertexAI”
- You should see the following screen indicating that the service is activated. If you're prompted to activate the service, please do so.
- Configure GCP CLI: Install and initialize the Google Cloud CLI. Authenticate with your GCP account and set your project as the default.
- Clone the repository: Start by cloning the repository with the microservice code.
Open your terminal and run the following commands:
1 git clone https://github.com/mongodb-industry-solutions/retail-store.git
Then, navigate to the directory containing the dynamic pricing microservice:
1 cd retail-store/microservices/dynamicPricing
- Configure Python packages: Once you're in the correct directory, type the following command and press Enter:
1 pip install -r requirements.txt
- Configure environment variables: Set up the necessary environment variables, including your MongoDB Atlas connection string and any other service-specific configurations. Environment variables are essential for managing configuration settings, especially those that contain sensitive information.
Here's a template illustrating how to set up environment variables for a microservice that connects to MongoDB Atlas:
Creating the
.env
File with the following content:1 MONGODB_URI=mongodb+srv://username:password@clusterName.mongodb.net/ 2 GOOGLE_APPLICATION_CREDENTIALS=my-google-credentials 3 GOOGLE_CLOUD_PROJECT=my-google-cloud 4 PUBSUB_TOPIC_ID=my-topic-id
Tip: Replace
username
and password
with your MongoDB username and password, clusterName
with the name of your MongoDB cluster, my-google-credentials
with the path to your Google application credentials file, my-google-cloud
with your Google Cloud project ID, and my-topic-id
with the ID of your Google Cloud Pub/Sub topic.Configure the Pub/Sub topic:
- Navigate to Pub/Sub in your Google Cloud Platform project.
- Provide a unique name for your topic in the "Topic ID" field.
- Adjust topic settings
- Select the encryption method you prefer
- Finalize the process by clicking the Create button
Develop the pricing logic: Modify the dynamicPricing service to implement your pricing algorithm. This could involve analyzing historical data, considering competitor pricing, and integrating real-time supply-demand signals.
- Click on +Create to make a new notebook.
Connect directly to your MongoDB cluster for live data analysis and algorithm training: Once you create a new notebook, you can use it to connect to your MongoDB cluster directly using the following snippets:
1 conn_string = "your_mongodb_connection_string_here" 2 client = MongoClient(conn_string)
1 db = client["your_database_name"] 2 collection = db["your_collection_name"]
Tip: Replace
"your_mongodb_connection_string_here"
with your actual MongoDB connection string. This typically follows a format similar to: mongodb+srv://<username>:<password>@<cluster-address>/<database-name>?retryWrites=true&w=majority
Be sure to substitute:
<username>
and <password>
with your MongoDB credentials.
<cluster-address>
with the address of your MongoDB cluster.
<database-name>
(optional) with the specific database you want to connect to within your cluster.
Additionally, replace "your_database_name"
and "your_collection_name"
with the actual names you are using in your MongoDB setup.This will allow you to pull data from your clusters live and train a pricing algorithm. In this case, we used TensorFlow to capture how prices change based on user behavior.
Train a TensorFlow neural network model: Now that we're connected to MongoDB, we'll show you a Jupyter Notebook designed for an e-commerce store, similar to the one in the introduction. Feel free to modify it for your specific needs. This notebook demonstrates how to train a TensorFlow neural network model to predict optimal prices based on e-commerce events stored in a MongoDB Atlas feature store. Let's get started.
We’ve decided that the e-commerce store has the following data model for capturing user behavior events:
Field | Data Type | Description | Example Values |
---|---|---|---|
product_name | String | The name of the product | "MongoDB Notebook" |
product_id | Integer | Unique identifier for the product | 98803 |
action | String | Type of action performed on the product (user interaction) | "view", "add_to_cart", "purchase" |
price | Float | Price of the product | 18.99 |
timestamp | String | ISO format timestamp of when the event occurred | "2024-03-25T12:36:25.428461" |
encoded_name | Integer | An encoded version of the product name for machine learning models | 23363195 |
This table assumes the product["price"] field is a float representing the price of the product in a single currency (e.g., USD). The encoded_name field is considered an integer, which could represent a hash or an encoding used to transform the product name into a numerical format suitable for machine learning models. The timestamp field is a string formatted as an ISO timestamp, which provides the exact date and time when the action was recorded. The example values are placeholders and should be replaced with actual data from your application.
Setting up a MongoDB connection with Python: First, we need to install the necessary Python packages and establish a connection to our MongoDB database.
1 ```python 2 !pip install pymongo 3 !pip install 'pymongo[srv]' 4 !pip install pandas 5 from pymongo import MongoClient 6 import pandas as pd 7 import keras
1 # Replace the below connection string with your MongoDB connection URI 2 conn_string = "your_mongodb_connection_string_here" 3 client = MongoClient(conn_string)
1 # Specify the database and collection 2 db = client["your_database_name"] 3 collection = db["your_collection_name"]
Data cleaning: Once connected, we'll fetch the data and perform some basic cleaning operations to prepare it for model training.
1 # Get all the documents 2 documents = collection.find() 3 4 # Convert the documents into a list and then into a DataFrame 5 df = pd.DataFrame(list(documents)) 6 7 # Drop unnecessary columns 8 df = df.drop(columns=['product_name', 'product_id', 'timestamp', 'tensor']) 9 10 # Extracting the 'amount' from the 'price' column and converting it to float 11 df['price'] = df['price'].apply(lambda x: float(x['amount']) if isinstance(x, dict) and 'amount' in x else None) 12 13 df = df.dropna()
Building the dynamic pricing model: Next, we import the necessary TensorFlow and scikit-learn libraries, encode categorical variables, and normalize our data.
1 import tensorflow as tf 2 from sklearn.model_selection import train_test_split 3 from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler 4 from tensorflow.keras.models import Sequential 5 from tensorflow.keras.layers import Dense 6 from tensorflow.keras.layers import Dropout
1 # Encode categorical variables 2 label_encoders = {} 3 for column in ['action', 'encoded_name']: 4 le = LabelEncoder() 5 df[column] = le.fit_transform(df[column]) 6 label_encoders[column] = le 7 8 df.head()
1 # Standardizing 2 scaler = StandardScaler() 3 df[['action', 'encoded_name']] = scaler.fit_transform(df[['action', 'encoded_name']]) 4 5 df.head()
Saving the encoder for event data pre-processing: We'll save the encoder objects to Google Cloud Storage for later use in preprocessing new data for predictions. This code will generate joblib files to save the encoding and standardizing criteria from the above preprocessing and upcoming training.
1 !pip install google-cloud-storage 2 from google.cloud import storage 3 import joblib 4 import io
1 # Initialize a client 2 storage_client = storage.Client() 3 4 # The name of your GCP bucket 5 bucket_name = 'dyn_pricing_scaler' 6 7 # The path within your bucket to save the scaler object 8 destination_blob_name = 'labelEncoder.joblib' 9 10 # Create a buffer 11 buffer = io.BytesIO() 12 13 # Dump the scaler object to the buffer 14 joblib.dump(label_encoders, buffer) 15 16 # Now upload the buffer content to GCS 17 bucket = storage_client.bucket(bucket_name) 18 blob = bucket.blob(destination_blob_name) 19 20 # Rewind the buffer's file pointer to the beginning of the file 21 buffer.seek(0) 22 23 # Upload the contents of the buffer 24 blob.upload_from_file(buffer, content_type='application/octet-stream') 25 26 print(f"Uploaded scaler to gs://{bucket_name}/{destination_blob_name}")
Training the model: With our data prepared, we'll split it into training and testing sets, define our neural network architecture, and train our model. Please remember this is a model meant for a simple demo.
1 from tensorflow.keras.models import Sequential 2 from tensorflow.keras.layers import Dense 3 from tensorflow.keras.optimizers import Adam 4 5 # Splitting data into training and testing sets 6 X = df.drop('price', axis=1) 7 y = df['price'] 8 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) 9 10 # Define the model 11 model = Sequential([ 12 Dense(64, activation='relu', input_shape=(X_train.shape[1],)), 13 Dense(64, activation='relu'), 14 Dense(1) # Output layer for regression 15 ]) 16 17 # Compile the model 18 model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae']) 19 20 # Print the model summary to check the architecture 21 model.summary() 22 23 # Train the model 24 history = model.fit(X_train, y_train, 25 validation_split=0.2, # Further split the training set for validation 26 epochs=10, # Number of epochs to train for 27 verbose=1, # Show training output 28 ) 29 30 # Evaluate the model on the test set 31 test_loss, test_mae = model.evaluate(X_test, y_test, verbose=1) 32 print(f"Test Loss: {test_loss}, Test MAE: {test_mae}")
Test prediction: After training, we make a test prediction to verify the model's performance.
1 import numpy as np 2 3 # Example new data point converted into a tensor 4 new_data = np.array([[1.225999, -0.957973]]) 5 6 predicted_price = model.predict(new_data) 7 print("Predicted Price:", predicted_price[0])
Saving the model: Finally, we'll save our trained model to Google Cloud Storage.
1 from google.colab import auth 2 auth.authenticate_user() 3 4 project_id = 'your-gcp-project-id' 5 !gcloud config set project {project_id} 6 7 model_dir = 'your-model-directory' 8 9 # Save the model to GCS 10 model_dir = f'gs://{bucket_name}/{model_dir} 11 model.save(model_dir) 12 13 bucket_name = 'your-cloud-storage-bucket-name' 14 model_dir = 'your-model-directory' 15 model_path = 'your-model-path' 16 !gsutil mb -l us-central1 gs://{bucket_name} # Create the bucket if necessary 17 !gsutil cp -r {model_dir} gs://{bucket_name}/{model_path}
Registering the model in Vertex AI: Next, we'll register our trained model in the VertexAI model registry:
1 from google.cloud import aiplatform 2 3 aiplatform.init(project='your-gcp-project-id', location='your-gcp-region') 4 5 #Model registry 6 7 model_display_name = 'dyn_pricingv1' 8 model_description = 'TensorFlow dynamic pricing model' 9 bucket_name = 'your-gcp-bucket-name' 10 model_path = 'your-model-path' 11 12 model = aiplatform.Model.upload( 13 display_name=model_display_name, 14 artifact_uri=f'gs://{bucket_name}/{model_path}', 15 serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest', 16 description=model_description, 17 )
Congratulations! You should now see your model listed in the Vertex AI Model Registry. This is where you'll manage and deploy your models for various applications.
Now, we need to train a TensorFlow neural network model to predict an optimal price based on e-commerce events stored in a MongoDB Atlas feature store.
You must deploy a model to an endpoint before that model can be used to serve online predictions. Deploying a model associates physical resources with the model so it can serve online predictions with low latency. Here are the steps needed:
- In the Google Cloud console, in the Vertex AI section, go to the Models page.
- Click the name and version ID of the model you want to deploy to open its details page (model from last step).
- Select the Deploy & Test tab.
- Click Deploy to endpoint.
- Fill out the rest of the parameters (Model Settings, Model Monitoring).
- Click on Deploy.
Next, in the Vertex AI panel:
- Click on Endpoints and then select your deployed model.
- Get the Endpoint ID from the details page, as you will need it to configure the Cloud Function for sending prediction requests to this endpoint.
cloudFunction configuration: Google CloudFunction will orchestrate converting events data into tensors and its input into the feature store collection, as well as invoking the VertexAI endpoint model. Follow these steps:
Browse to Cloud Functions in your GCP project and click on CREATE FUNCTION.
Make sure the trigger for your Cloud function is the previously created Pub/Sub topic.
In the
main.py
file seen below, copy and paste the following Python code snippet:1 from pymongo import MongoClient 2 from datetime import datetime, timedelta 3 import json 4 import numpy as np 5 import pandas as pd 6 import joblib 7 import os 8 from google.cloud import storage 9 import functions_framework 10 import base64 11 from bson import ObjectId 12 from google.cloud import aiplatform 13 14 @functions_framework.cloud_event 15 def hello_pubsub(cloud_event): 16 event_data = base64.b64decode(cloud_event.data["message"]["data"]) 17 print(event_data) 18 event_data = json.loads(event_data) 19 print(event_data) 20 event_data = pd.DataFrame([event_data]) 21 print(event_data) 22 23 # Correctly handle the '_id' field when it's a string, getting the event_id and product_id data 24 event_id = ObjectId(str(event_data['_id'].loc[0])) 25 print(event_id) 26 product_id = event_data['product_id'].loc[0] 27 print(product_id) 28 29 # Set your GCS bucket and file path 30 bucket_name = 'dyn_pricing_scaler' 31 scaler_file_name = 'scaler.joblib' 32 label_encoder_name = 'labelEncoder.joblib' 33 34 # Initialize a GCS client 35 storage_client = storage.Client() 36 37 # Get the bucket 38 bucket = storage_client.bucket(bucket_name) 39 40 # Get the blob (file) containing the scaler 41 blob1 = bucket.blob(scaler_file_name) 42 43 # Download the scaler file to a temporary location 44 scaler_temp_file_path = '/tmp/' + scaler_file_name 45 blob1.download_to_filename(scaler_temp_file_path) 46 47 # Download the label encoder to a temporary location 48 blob2 = bucket.blob(label_encoder_name) 49 label_encoder_temp = '/tmp/' + label_encoder_name 50 blob2.download_to_filename(label_encoder_temp) 51 52 # Clean unnecessary columns 53 if '_id' in event_data.columns: 54 event_data = event_data.drop(columns=['_id']) 55 print(event_data) 56 event_data.drop('price', axis=1, inplace=True) 57 event_data.drop('timestamp', axis=1, inplace=True) 58 print(event_data) 59 event_data.drop('product_id', axis=1, inplace=True) 60 print(event_data) 61 event_data.drop('product_name', axis=1, inplace=True) 62 print(event_data) 63 64 #Load label encoder 65 label_encoders = joblib.load(label_encoder_temp) 66 print(label_encoders) 67 68 #Encode categorical fields 69 for column, encoder in label_encoders.items(): 70 event_data[column] = encoder.transform(event_data[column]) 71 print(event_data) 72 73 74 # Load the scaler using joblib 75 scaler = joblib.load(scaler_temp_file_path) 76 print(scaler) 77 78 79 # Use scaler 80 columns_to_scale = ['action', 'encoded_name'] 81 event_data[columns_to_scale] = scaler.transform(event_data[columns_to_scale]) 82 print(event_data) 83 84 #Prepare data as tensors 85 #first_row_scaled = event_data[0, :].reshape(1, -1) 86 #print(first_row_scaled) 87 event_data = event_data.to_numpy() 88 89 # Prepare input data for VertexAI 90 input_data = {"instances": event_data} 91 print(input_data) 92 input_data['instances'] = input_data['instances'].tolist() 93 print(input_data) 94 input_data_json = json.dumps(input_data) 95 print(input_data_json) 96 97 # Call VertexAI endpoint 98 endpoint_id = "your-endpoint-id" 99 project_id = "your-project-id" 100 location = "your-project-location" 101 endpoint_url = f"https://us-central1-aiplatform.googleapis.com/v1/projects/{project_id}/locations/us-central1/endpoints/{endpoint_id}:predict" 102 103 aiplatform.init(project=project_id, location=location) 104 105 endpoint = aiplatform.Endpoint(endpoint_id) 106 107 prediction = endpoint.predict(instances=input_data['instances']) 108 109 print(prediction) 110 111 pred_price = prediction.predictions[0][0] 112 113 print(pred_price) 114 115 #Start MongoClient 116 mongo_uri = "your-mongo-db-connection-uri" #remember you can set it up in a .env file 117 client = MongoClient(mongo_uri) 118 119 db = client["dotLocalStore"] # Replace "your_database" with your actual database name 120 collection = db["products"] 121 feature_store = db["events"] 122 123 # Update products collection document with 'pred_price' 124 collection.update_one({"id": int(product_id)}, {"$set": {"pred_price": pred_price}}) 125 126 # Update events collection (feature_store) with event tensor 127 tensor = event_data.tolist() 128 print(tensor) 129 feature_store.update_one({"_id": event_id}, {"$set": {"tensor": tensor }}) 130 131 # Clean up: Delete the temporary files 132 os.remove(scaler_temp_file_path) 133 os.remove(label_encoder_temp) 134 print(pred_price) 135 136 return pred_price
Make sure you add the
requirements.txt
seen below in the Cloud Function folder structure over GCP:1 functions-framework==3.* 2 pymongo 3 scikit-learn==1.2.2 4 numpy 5 google-cloud-aiplatform 6 google-auth 7 google-cloud-storage 8 pandas 9 joblib
Simulating customer events: If you're looking to mimic customer behavior, feel free to use the Python script called
generator.py
seen below.1 import random 2 import pymongo 3 from pymongo import MongoClient 4 import os 5 import time 6 from google.cloud import pubsub_v1 7 from dotenv import load_dotenv 8 from faker import Faker 9 from datetime import datetime, timedelta 10 import json 11 from bson import ObjectId 12 import hashlib 13 14 # Load environment variables 15 load_dotenv() 16 MONGODB_URI = os.getenv('MONGODB_URI') 17 GOOGLE_APPLICATION_CREDENTIALS = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') 18 GOOGLE_CLOUD_PROJECT = os.getenv('GOOGLE_CLOUD_PROJECT') 19 PUBSUB_TOPIC_ID = os.getenv('PUBSUB_TOPIC_ID') 20 21 # Initialize MongoDB client 22 mongo_client = pymongo.MongoClient(MONGODB_URI) 23 db = mongo_client["dotLocalStore"] 24 behaviors_collection = db["events"] 25 products_collection = db["products"] 26 27 # Initialize Google Cloud Pub/Sub publisher 28 publisher = pubsub_v1.PublisherClient() 29 topic_path = publisher.topic_path(GOOGLE_CLOUD_PROJECT, PUBSUB_TOPIC_ID) 30 31 def fetch_products(): 32 """Fetch products from MongoDB""" 33 return list(products_collection.find({})) 34 35 def generate_ecommerce_behavior(product): 36 """Generate a single synthetic ecommerce behavior data for a given product""" 37 # Encode product_name into a numerical field 38 encoded_name = int(hashlib.sha256(product["name"].encode('utf-8')).hexdigest(), 16) % 10**8 39 behavior = { 40 "product_name": product["name"], 41 "product_id": product["id"], 42 "action": random.choice(["view", "add_to_cart", "purchase"]), 43 "price": product["price"], 44 "timestamp": datetime.now().isoformat(), # Format timestamp for JSON serialization 45 "encoded_name": encoded_name 46 } 47 return behavior 48 49 # Custom JSON Encoder that converts ObjectId to str 50 class JSONEncoder(json.JSONEncoder): 51 def default(self, o): 52 if isinstance(o, ObjectId): 53 return str(o) 54 return json.JSONEncoder.default(self, o) 55 56 def push_event_to_mongodb(behavior): 57 """Push a single ecommerce behavior data to MongoDB""" 58 behaviors_collection.insert_one(behavior) 59 print("Pushed an event to MongoDB.") 60 61 def push_event_to_pubsub(event): 62 """Push a single event to Google Cloud Pub/Sub""" 63 try: 64 # Attempt to serialize the event to JSON 65 data = json.dumps(event, cls=JSONEncoder).encode("utf-8") 66 # Attempt to publish the serialized data to Pub/Sub 67 future = publisher.publish(topic_path, data=data) 68 print(f"Published id:{event['product_id']} product:{event['product_name']} to Pub/Sub.") 69 future.result() # Block until the publish completes 70 except Exception as e: 71 print(f"An error occurred: {e}") 72 73 if __name__ == "__main__": 74 products = fetch_products() 75 num_behaviors_per_cycle = 150 76 77 try: 78 while True: 79 for _ in range(num_behaviors_per_cycle): 80 # Select a random product 81 selected_product = random.choice(products) 82 # Generate behavior for the selected product 83 behavior = generate_ecommerce_behavior(selected_product) 84 # Push the behavior to MongoDB 85 push_event_to_mongodb(behavior) 86 # Push the behavior to Pub/Sub 87 push_event_to_pubsub(behavior) 88 # Wait for 3 seconds before generating the next behavior 89 time.sleep(3) 90 except KeyboardInterrupt: 91 print("Stopped by the user.")
The python script generates fake customer events based on the explained data model. These events will be pushed to a Pub/Sub topic and your Atlas feature store collection. You can adjust the number of events and their cadence directly within the code. To run this script, use the following command:
1 python3 generator.py
After running this script, you should be able to see fake customer events being pushed into your MongoDB Atlas cluster and your Pub/Sub topic, effectively triggering the microservice to respond to those events and calculate optimal pricing points for the different products.
Have you mastered building a reactive dynamic pricing microservice? Great job! Here's what you learned:
- Centralized feature store: MongoDB serves as a feature store, acting as a centralized repository specifically designed for storing, managing, and serving features for machine learning (ML) models. Its polymorphic capabilities enable the utilization of a single interface to represent various types of data. This implies that as new features are introduced or pricing models evolve, MongoDB can adeptly manage diverse data types within the same system. In the context of dynamic pricing, this capability facilitates the seamless incorporation of new pricing factors or variables without causing disruptions to existing data structures or operations.
- Scalability and efficiency: Google Cloud Pub/Sub can handle massive volumes of customer data efficiently, ensuring scalability for real-world applications. While this microservice simulates only 25 customer events every three seconds, Pub/Sub is capable of processing much larger data streams.
- Real-time price updates: Cloud functions trigger TensorFlow models to generate dynamic prices based on customer behavior. These generated prices are then inserted or updated (upserted) back into the product catalog collection in MongoDB. This enables real-time adjustments in the e-commerce application because the application's front end retrieves data directly from the same collection.
Curious how MongoDB is changing the retail landscape? Dive deeper into MongoDB's capabilities and discover how it's revolutionizing the industry:
MongoDB helps retailers innovate and gain a competitive edge. Apply for an innovation workshop to explore the possibilities with our experts. If you’d like to connect with other people using MongoDB to build their next big project, head to the MongoDB Developer Community.