Using MongoDB with Apache Airflow
Rate this tutorial
While writing cron jobs to execute scripts is one way to accomplish data movement, as workflows become more complex, managing job scheduling becomes very difficult and error-prone. This is where Apache Airflow shines. Airflow is a workflow management system originally designed by Airbnb and open sourced in 2015. With Airflow, you can programmatically author, schedule, and monitor complex data pipelines. Airflow is used in many use cases with MongoDB, including:
- Machine learning pipelines.
- Automating database administration operations.
- Batch movement of data.
In this post, you will learn the basics of how to leverage MongoDB within an Airflow pipeline.
Apache Airflow consists of a number of installation steps, including installing a database and webserver. While it’s possible to follow the installation script and configure the database and services, the easiest way to get started with Airflow is to use Astronomer CLI. This CLI stands up a complete Airflow docker environment from a single command line.
Likewise, the easiest way to stand up a MongoDB cluster is with MongoDB Atlas. Atlas is not just a hosted MongoDB cluster. Rather, it’s an integrated suite of cloud database and data services that enable you to quickly build your applications. One service, Atlas Data Federation, is a cloud-native query processing service that allows users to create a virtual collection from heterogeneous data sources such as Amazon S3 buckets, MongoDB clusters, and HTTP API endpoints. Once defined, the user simply issues a query to obtain data combined from these sources.
For example, consider a scenario where you were moving data with an Airflow DAG into MongoDB and wanted to join cloud object storage - Amazon S3 or Microsoft Azure Blob Storage data with MongoDB as part of a data analytics application. Using MongoDB Atlas Data Federation, you create a virtual collection that contains a MongoDB cluster and a cloud object storage collection. Now, all your application needs to do is issue a single query and Atlas takes care of joining heterogeneous data. This feature and others like MongoDB Charts, which we will see later in this post, will increase your productivity and enhance your Airflow solution. To learn more about MongoDB Atlas Data Federation, check out the MongoDB.live webinar on YouTube, Help You Data Flow with Atlas Data Lake. For an overview of MongoDB Atlas, check out Intro to MongoDB Atlas in 10 mins | Jumpstart, available on YouTube.
In this post, we will create an Airflow workflow that queries an HTTP endpoint for a historical list of currency values versus the Euro. The data will then be inserted into MongoDB using the MongoHook and a chart will be created using MongoDB Charts. In Airflow, a hook is an interface to an external platform or database such as MongoDB. The MongoHook wraps the PyMongo Python Driver for MongoDB, unlocking all the capabilities of the driver within an Airflow workflow.
If you don’t have an Airflow environment already available, install the Astro CLI. Once it’s installed, create a directory for the project called “currency.”
mkdir currency && cd currency
Next, create the Airflow environment using the Astro CLI.
astro dev init
This command will create a folder structure that includes a folder for DAGs, a Dockerfile, and other support files that are used for customizations.
Providers help Airflow interface with external systems. To add a provider, modify the requirements.txt file and add the MongoDB provider.
echo “apache-airflow-providers-mongo==3.0.0” >> requirements.txt
Finally, start the Airflow project.
astro dev start
This simple command will start and configure the four docker containers needed for Airflow: a webserver, scheduler, triggerer, and Postgres database, respectively.
Astro dev restart
Note: You can also manually install the MongoDB Provider using PyPi if you are not using the Astro CLI.
Note: The HTTP provider is already installed as part of the Astro runtime. If you did not use Astro, you will need to install the HTTP provider.
One of the components that is installed with Airflow is a webserver. This is used as the main operational portal for Airflow workflows. To access, open a browser and navigate to http://localhost:8080. Depending on how you installed Airflow, you might see example DAGs already populated. Airflow workflows are referred to as DAGs (Directed Acyclic Graphs) and can be anything from the most basic job scheduling pipelines to more complex ETL, machine learning, or predictive data pipeline workflows such as fraud detection. These DAGs are Python scripts that give developers complete control of the workflow. DAGs can be triggered manually via an API call or the web UI. DAGs can also be scheduled for execution one time, recurring, or in any cron-like configuration.
Let’s get started exploring Airflow by creating a Python file, “currency.py,” within the dags folder using your favorite editor.
The following is the complete source code for the DAG.
1 import os 2 import json 3 from airflow import DAG 4 from airflow.operators.python import PythonOperator 5 from airflow.operators.bash import BashOperator 6 from airflow.providers.http.operators.http import SimpleHttpOperator 7 from airflow.providers.mongo.hooks.mongo import MongoHook 8 from datetime import datetime,timedelta 9 10 def on_failure_callback(**context): 11 print(f"Task {context['task_instance_key_str']} failed.") 12 13 def uploadtomongo(ti, **context): 14 try: 15 hook = MongoHook(mongo_conn_id='mongoid') 16 client = hook.get_conn() 17 db = client.MyDB 18 currency_collection=db.currency_collection 19 print(f"Connected to MongoDB - {client.server_info()}") 20 d=json.loads(context["result"]) 21 currency_collection.insert_one(d) 22 except Exception as e: 23 printf("Error connecting to MongoDB -- {e}") 24 25 with DAG( 26 dag_id="load_currency_data", 27 schedule_interval=None, 28 start_date=datetime(2022,10,28), 29 catchup=False, 30 tags= ["currency"], 31 default_args={ 32 "owner": "Rob", 33 "retries": 2, 34 "retry_delay": timedelta(minutes=5), 35 'on_failure_callback': on_failure_callback 36 } 37 ) as dag: 38 39 t1 = SimpleHttpOperator( 40 task_id='get_currency', 41 method='GET', 42 endpoint='2022-01-01..2022-06-30', 43 headers={"Content-Type": "application/json"}, 44 do_xcom_push=True, 45 dag=dag) 46 47 t2 = PythonOperator( 48 task_id='upload-mongodb', 49 python_callable=uploadtomongo, 50 op_kwargs={"result": t1.output}, 51 dag=dag 52 ) 53 54 t1 >> t2
When you look at the code, notice there are no connection strings within the Python file. Connection identifiers as shown in the below code snippet are placeholders for connection strings.
hook = MongoHook(mongo_conn_id='mongoid')
Connection identifiers and the connection configurations they represent are defined within the Connections tab of the Admin menu in the Airflow UI.
In this example, since we are connecting to MongoDB and an HTTP API, we need to define two connections. First, let’s create the MongoDB connection by clicking the “Add a new record” button.
This will present a page where you can fill out connection information. Select “MongoDB” from the Connection Type drop-down and fill out the following fields:
Connection Id | mongoid |
Connection Type | MongoDB |
Host | XXXX..mongodb.net (Place your MongoDB Atlas hostname here) |
Schema | MyDB (e.g. the database in MongoDB) |
Login | (Place your database username here) |
Password | (Place your database password here) |
Extra | {"srv": true} |
Click “Save” and “Add a new record” to create the HTTP API connection.
Select “HTTP” for the Connection Type and fill out the following fields:
Connection Id | http_default |
Connection Type | HTTP |
Host | api.frankfurter.app |
Note: Connection strings can also be stored in environment variables or stores securely using an external secrets back end, such as HashiCorp Vault or AWS SSM Parameter Store.
Click on the DAGs menu and then “load_currency_data.” You’ll be presented with a number of sub items that address the workflow, such as the Code menu that shows the Python code that makes up the DAG.
Clicking on Graph will show a visual representation of the DAG parsed from the Python code.
In our example, “get_currency” uses the SimpleHttpOperator to obtain a historical list of currency values versus the Euro.
1 t1 = SimpleHttpOperator( 2 task_id='get_currency', 3 method='GET', 4 endpoint='2022-01-01..2022-06-30', 5 headers={"Content-Type": "application/json"}, 6 do_xcom_push=True, 7 dag=dag)
Airflow passes information between tasks using XComs. In this example, we store the return data from the API call to XCom. The next operator, “upload-mongodb,” uses the PythonOperator to call a python function, “uploadtomongo.”
1 t2 = PythonOperator( 2 task_id='upload-mongodb', 3 python_callable=uploadtomongo, 4 op_kwargs={"result": t1.output}, 5 dag=dag 6 )
This function accesses the data stored in XCom and uses MongoHook to insert the data obtained from the API call into a MongoDB cluster.
1 def uploadtomongo(ti, **context): 2 try: 3 hook = MongoHook(mongo_conn_id='mongoid') 4 client = hook.get_conn() 5 db = client.MyDB 6 currency_collection=db.currency_collection 7 print(f"Connected to MongoDB - {client.server_info()}") 8 d=json.loads(context["result"]) 9 currency_collection.insert_one(d) 10 except Exception as e: 11 printf("Error connecting to MongoDB -- {e}")
While our example workflow is simple, execute a task and then another task.
1 t1 >> t2
Airflow overloaded the “>>” bitwise operator to describe the flow of tasks. For more information, see “Bitshift Composition.”
Airflow can enable more complex workflows, such as the following:
Task execution can be conditional with multiple execution paths.
Airflow is known best for its workflow scheduling capabilities, and these are defined as part of the DAG definition.
1 with DAG( 2 dag_id="load_currency_data", 3 schedule=None, 4 start_date=datetime(2022,10,28), 5 catchup=False, 6 tags= ["currency"], 7 default_args={ 8 "owner": "Rob", 9 "retries": 2, 10 "retry_delay": timedelta(minutes=5), 11 'on_failure_callback': on_failure_callback 12 } 13 ) as dag:
The scheduling interval can be defined using a cron expression, a timedelta, or one of AIrflow presets, such as the one used in this example, “None.”
DAGs can be scheduled to start at a date in the past. If you’d like Airflow to catch up and execute the DAG as many times as would have been done within the start time and now, you can set the “catchup” property. Note: “Catchup” defaults to “True,” so make sure you set the value accordingly.
From our example, you can see just some of the configuration options available.
Once it’s executed, you can click on the DAG and Grid menu item to display the runtime status of the DAG.
In the example above, the DAG was run four times, all with success. You can view the log of each step by clicking on the task and then “Log” from the menu.
The log is useful for troubleshooting the task. Here we can see our output from the
print(f"Connected to MongoDB - {client.server_info()}")
command within the PythonOperator.Once we run the DAG, the data will be in the MongoDB Atlas cluster. Navigating to the cluster, we can see the “currency_collection” was created and populated with currency data.
Next, we can visualize the data by using MongoDB Charts.
Note that the data that was stored in MongoDB from the API with a subdocument for every day of the given period. A sample of this data is as follows:
1 { 2 _id: ObjectId("635b25bdcef2d967af053e2c"), 3 amount: 1, 4 base: 'EUR', 5 start_date: '2022-01-03', 6 end_date: '2022-06-30', 7 rates: { 8 '2022-01-03': { 9 AUD: 1.5691, 10 BGN: 1.9558, 11 BRL: 6.3539, 12 … }, 13 }, 14 '2022-01-04': { 15 AUD: 1.5682, 16 BGN: 1.9558, 17 BRL: 6.4174, 18 … }
With MongoDB Charts, we can define an aggregation pipeline filter to transform the data into a format that will be optimized for chart creation. For example, consider the following aggregation pipeline filter:
1 [{$project:{ 2 rates:{ 3 $objectToArray:"$rates"}}},{ 4 $unwind:"$rates" 5 } 6 ,{ 7 $project:{ 8 _id:0,"date":"$rates.k","Value":"$rates.v"}}]
This transforms the data into subdocuments that have two key value pairs of the date and values respectively.
1 { 2 date: '2022-01-03', 3 Value: { 4 AUD: 1.5691, 5 BGN: 1.9558, 6 BRL: 6.3539, 7 … }, 8 { 9 date: '2022-01-04', 10 Value: { 11 AUD: 1.5682, 12 BGN: 1.9558, 13 BRL: 6.4174, 14 ..} 15 }
We can add this aggregation pipeline filter into Charts and build out a chart comparing the US dollar (USD) to the Euro (EUR) over this time period.
For more information on MongoDB Charts, check out the YouTube video “Intro to MongoDB Charts (demo)” for a walkthrough of the feature.
Airflow is an open-sourced workflow scheduler used by many enterprises throughout the world. Integrating MongoDB with Airflow is simple using the MongoHook. Astronomer makes it easy to quickly spin up a local Airflow deployment. Astronomer also has a registry that provides a central place for Airflow operators, including the MongoHook and MongoSensor.