Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Learn why MongoDB was selected as a leader in the 2024 Gartner® Magic Quadrant™
MongoDB Developer
MongoDB
plus
Sign in to follow topics
MongoDB Developer Center
chevron-right
Developer Topics
chevron-right
Products
chevron-right
MongoDB
chevron-right

Using MongoDB with Apache Airflow

Robert Walters8 min read • Published Nov 15, 2022 • Updated Jun 12, 2023
MongoDB
Facebook Icontwitter iconlinkedin icon
Using MongoDB with Apache Airflow
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
While writing cron jobs to execute scripts is one way to accomplish data movement, as workflows become more complex, managing job scheduling becomes very difficult and error-prone. This is where Apache Airflow shines. Airflow is a workflow management system originally designed by Airbnb and open sourced in 2015. With Airflow, you can programmatically author, schedule, and monitor complex data pipelines. Airflow is used in many use cases with MongoDB, including:
  • Machine learning pipelines.
  • Automating database administration operations.
  • Batch movement of data.
In this post, you will learn the basics of how to leverage MongoDB within an Airflow pipeline.

Getting started

Apache Airflow consists of a number of installation steps, including installing a database and webserver. While it’s possible to follow the installation script and configure the database and services, the easiest way to get started with Airflow is to use Astronomer CLI. This CLI stands up a complete Airflow docker environment from a single command line.
Likewise, the easiest way to stand up a MongoDB cluster is with MongoDB Atlas. Atlas is not just a hosted MongoDB cluster. Rather, it’s an integrated suite of cloud database and data services that enable you to quickly build your applications. One service, Atlas Data Federation, is a cloud-native query processing service that allows users to create a virtual collection from heterogeneous data sources such as Amazon S3 buckets, MongoDB clusters, and HTTP API endpoints. Once defined, the user simply issues a query to obtain data combined from these sources.
For example, consider a scenario where you were moving data with an Airflow DAG into MongoDB and wanted to join cloud object storage - Amazon S3 or Microsoft Azure Blob Storage data with MongoDB as part of a data analytics application.  Using MongoDB Atlas Data Federation, you create a virtual collection that contains a MongoDB cluster and a cloud object storage collection. Now, all your application needs to do is issue a single query and Atlas takes care of joining heterogeneous data. This feature and others like MongoDB Charts, which we will see later in this post, will increase your productivity and enhance your Airflow solution. To learn more about MongoDB Atlas Data Federation, check out the MongoDB.live webinar on YouTube, Help You Data Flow with Atlas Data Lake.  For an overview of MongoDB Atlas, check out Intro to MongoDB Atlas in 10 mins | Jumpstart, available on YouTube.

Currency over time

In this post, we will create an Airflow workflow that queries an HTTP endpoint for a historical list of currency values versus the Euro. The data will then be inserted into MongoDB using the MongoHook and a chart will be created using MongoDB Charts. In Airflow, a hook is an interface to an external platform or database such as MongoDB. The MongoHook wraps the PyMongo Python Driver for MongoDB, unlocking all the capabilities of the driver within an Airflow workflow.

Step 1: Spin up the Airflow environment

If you don’t have an Airflow environment already available, install the Astro CLI. Once it’s installed, create a directory for the project called “currency.”
mkdir currency && cd currency
Next, create the Airflow environment using the Astro CLI.
astro dev init
This command will create a folder structure that includes a folder for DAGs, a Dockerfile, and other support files that are used for customizations.

Step 2: Install the MongoDB Airflow provider

Providers help Airflow interface with external systems. To add a provider, modify the requirements.txt file and add the MongoDB provider.
echo “apache-airflow-providers-mongo==3.0.0” >> requirements.txt
Finally, start the Airflow project.
astro dev start
This simple command will start and configure the four docker containers needed for Airflow: a webserver, scheduler, triggerer, and Postgres database, respectively.
Astro dev restart
Note: You can also manually install the MongoDB Provider using PyPi if you are not using the Astro CLI.
Note: The HTTP provider is already installed as part of the Astro runtime. If you did not use Astro, you will need to install the HTTP provider.

Step 3: Creating the DAG workflow

One of the components that is installed with Airflow is a webserver. This is used as the main operational portal for Airflow workflows. To access, open a browser and navigate to http://localhost:8080. Depending on how you installed Airflow, you might see example DAGs already populated. Airflow workflows are referred to as DAGs (Directed Acyclic Graphs) and can be anything from the most basic job scheduling pipelines to more complex ETL, machine learning, or predictive data pipeline workflows such as fraud detection. These DAGs are Python scripts that give developers complete control of the workflow. DAGs can be triggered manually via an API call or the web UI. DAGs can also be scheduled for execution one time, recurring, or in any cron-like configuration.
Let’s get started exploring Airflow by creating a Python file, “currency.py,” within the dags folder using your favorite editor.
The following is the complete source code for the DAG.
1import os
2import json
3from airflow import DAG
4from airflow.operators.python import PythonOperator
5from airflow.operators.bash import BashOperator
6from airflow.providers.http.operators.http import SimpleHttpOperator
7from airflow.providers.mongo.hooks.mongo import MongoHook
8from datetime import datetime,timedelta
9
10def on_failure_callback(**context):
11 print(f"Task {context['task_instance_key_str']} failed.")
12
13def uploadtomongo(ti, **context):
14 try:
15 hook = MongoHook(mongo_conn_id='mongoid')
16 client = hook.get_conn()
17 db = client.MyDB
18 currency_collection=db.currency_collection
19 print(f"Connected to MongoDB - {client.server_info()}")
20 d=json.loads(context["result"])
21 currency_collection.insert_one(d)
22 except Exception as e:
23 printf("Error connecting to MongoDB -- {e}")
24
25with DAG(
26 dag_id="load_currency_data",
27 schedule_interval=None,
28 start_date=datetime(2022,10,28),
29 catchup=False,
30 tags= ["currency"],
31 default_args={
32 "owner": "Rob",
33 "retries": 2,
34 "retry_delay": timedelta(minutes=5),
35 'on_failure_callback': on_failure_callback
36 }
37) as dag:
38
39 t1 = SimpleHttpOperator(
40 task_id='get_currency',
41 method='GET',
42 endpoint='2022-01-01..2022-06-30',
43 headers={"Content-Type": "application/json"},
44 do_xcom_push=True,
45 dag=dag)
46
47 t2 = PythonOperator(
48 task_id='upload-mongodb',
49 python_callable=uploadtomongo,
50 op_kwargs={"result": t1.output},
51 dag=dag
52 )
53
54 t1 >> t2

Step 4: Configure connections

When you look at the code, notice there are no connection strings within the Python file. Connection identifiers as shown in the below code snippet are placeholders for connection strings.
hook = MongoHook(mongo_conn_id='mongoid')
Connection identifiers and the connection configurations they represent are defined within the Connections tab of the Admin menu in the Airflow UI.
admin menu item drop down list
In this example, since we are connecting to MongoDB and an HTTP API, we need to define two connections. First, let’s create the MongoDB connection by clicking the “Add a new record” button.
Add a new record button
This will present a page where you can fill out connection information. Select “MongoDB” from the Connection Type drop-down and fill out the following fields:
Connection Idmongoid
Connection TypeMongoDB
HostXXXX..mongodb.net

(Place your MongoDB Atlas hostname here)
SchemaMyDB

(e.g. the database in MongoDB)
Login(Place your database username here)
Password(Place your database password here)
Extra{"srv": true}
Click “Save” and “Add a new record” to create the HTTP API connection.
Select “HTTP” for the Connection Type and fill out the following fields:
Connection Idhttp_default
Connection TypeHTTP
Hostapi.frankfurter.app
Note: Connection strings can also be stored in environment variables or stores securely using an external secrets back end, such as HashiCorp Vault or AWS SSM Parameter Store.

Step 5: The DAG workflow

Click on the DAGs menu and then “load_currency_data.”  You’ll be presented with a number of sub items that address the workflow, such as the Code menu that shows the Python code that makes up the DAG.
Clicking on Graph will show a visual representation of the DAG parsed from the Python code.
graph view of DAG workflow In our example, “get_currency” uses the SimpleHttpOperator to obtain a historical list of currency values versus the Euro.
1t1 = SimpleHttpOperator(
2 task_id='get_currency',
3 method='GET',
4 endpoint='2022-01-01..2022-06-30',
5 headers={"Content-Type": "application/json"},
6 do_xcom_push=True,
7 dag=dag)
Airflow passes information between tasks using XComs. In this example, we store the return data from the API call to XCom. The next operator, “upload-mongodb,” uses the PythonOperator to call a python function, “uploadtomongo.”
1t2 = PythonOperator(
2 task_id='upload-mongodb',
3 python_callable=uploadtomongo,
4 op_kwargs={"result": t1.output},
5 dag=dag
6 )
This function accesses the data stored in XCom and uses MongoHook to insert the data obtained from the API call into a MongoDB cluster.
1def uploadtomongo(ti, **context):
2 try:
3 hook = MongoHook(mongo_conn_id='mongoid')
4 client = hook.get_conn()
5 db = client.MyDB
6 currency_collection=db.currency_collection
7 print(f"Connected to MongoDB - {client.server_info()}")
8 d=json.loads(context["result"])
9 currency_collection.insert_one(d)
10 except Exception as e:
11 printf("Error connecting to MongoDB -- {e}")
While our example workflow is simple, execute a task and then another task.
1t1 >> t2
Airflow overloaded the “>>” bitwise operator to describe the flow of tasks. For more information, see “Bitshift Composition.”
Airflow can enable more complex workflows, such as the following:
Graph view of DAG workflow
Task execution can be conditional with multiple execution paths.

Step 6: Scheduling the DAG

Airflow is known best for its workflow scheduling capabilities, and these are defined as part of the DAG definition.
1with DAG(
2 dag_id="load_currency_data",
3 schedule=None,
4 start_date=datetime(2022,10,28),
5 catchup=False,
6 tags= ["currency"],
7 default_args={
8 "owner": "Rob",
9 "retries": 2,
10 "retry_delay": timedelta(minutes=5),
11 'on_failure_callback': on_failure_callback
12 }
13) as dag:
The scheduling interval can be defined using a cron expression, a timedelta, or one of AIrflow presets, such as the one used in this example, “None.”
DAGs can be scheduled to start at a date in the past. If you’d like Airflow to catch up and execute the DAG as many times as would have been done within the start time and now, you can set the “catchup” property. Note: “Catchup” defaults to “True,” so make sure you set the value accordingly.
From our example, you can see just some of the configuration options available.

Step 7: Running the DAG

You can execute a DAG ad-hoc through the web using the “play” button under the action column.
actions menu options
Once it’s executed, you can click on the DAG and Grid menu item to display the runtime status of the DAG.
view of dag execution
In the example above, the DAG was run four times, all with success. You can view the log of each step by clicking on the task and then “Log” from the menu.
showing log details view
The log is useful for troubleshooting the task. Here we can see our output from the print(f"Connected to MongoDB - {client.server_info()}") command within the PythonOperator.
log by attempts

Step 8: Exploring the data in MongoDB Atlas

Once we run the DAG, the data will be in the MongoDB Atlas cluster. Navigating to the cluster, we can see the “currency_collection” was created and populated with currency data.
MongoDB Atlas collection view

Step 9: Visualizing the data using MongoDB Charts

Next, we can visualize the data by using MongoDB Charts.
Note that the data that was stored in MongoDB from the API with a subdocument for every day of the given period. A sample of this data is as follows:
1{
2 _id: ObjectId("635b25bdcef2d967af053e2c"),
3 amount: 1,
4 base: 'EUR',
5 start_date: '2022-01-03',
6 end_date: '2022-06-30',
7 rates: {
8 '2022-01-03': {
9 AUD: 1.5691,
10 BGN: 1.9558,
11 BRL: 6.3539,
12… },
13},
14 '2022-01-04': {
15 AUD: 1.5682,
16 BGN: 1.9558,
17 BRL: 6.4174,
18… }
With MongoDB Charts, we can define an aggregation pipeline filter to transform the data into a format that will be optimized for chart creation. For example, consider the following aggregation pipeline filter:
1[{$project:{
2 rates:{
3 $objectToArray:"$rates"}}},{
4 $unwind:"$rates"
5 }
6 ,{
7 $project:{
8 _id:0,"date":"$rates.k","Value":"$rates.v"}}]
This transforms the data into subdocuments that have two key value pairs of the date and values respectively.
1{
2 date: '2022-01-03',
3 Value: {
4 AUD: 1.5691,
5 BGN: 1.9558,
6 BRL: 6.3539,
7… },
8 {
9 date: '2022-01-04',
10 Value: {
11 AUD: 1.5682,
12 BGN: 1.9558,
13 BRL: 6.4174,
14..}
15}
We can add this aggregation pipeline filter into Charts and build out a chart comparing the US dollar (USD) to the Euro (EUR) over this time period.
We can add this aggregation pipeline filter into Charts and build out a chart comparing the US dollar (USD) to the Euro (EUR) over this time period. For more information on MongoDB Charts, check out the YouTube video “Intro to MongoDB Charts (demo)” for a walkthrough of the feature.

Summary

Airflow is an open-sourced workflow scheduler used by many enterprises throughout the world.  Integrating MongoDB with Airflow is simple using the MongoHook. Astronomer makes it easy to quickly spin up a local Airflow deployment. Astronomer also has a registry that provides a central place for Airflow operators, including the MongoHook and MongoSensor. 

Useful resources

Learn more about Astronomer, and check out the MongoHook documentation.

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

How to Use the Union All Aggregation Pipeline Stage in MongoDB 4.4


Sep 09, 2024 | 16 min read
Tutorial

Building with Patterns: The Bucket Pattern


May 16, 2022 | 3 min read
Tutorial

MongoDB Network Compression: A Win-Win


Aug 13, 2024 | 6 min read
Tutorial

Ensuring High Availability for MongoDB on Kubernetes


Jul 12, 2024 | 11 min read
Table of Contents
  • Getting started