Data science projects often require multiple steps to go from raw data to useful data products. These steps tend to be sequential, and involve things like:
-
sourcing data
-
cleaning data
-
processing data
-
training models
After you understand the steps necessary to deliver results from your work, it’s useful to automate them as a repeatable pipeline. Domino can schedule Jobs, but for more complex pipelines you can pair Domino with an external scheduling system like Apache Airflow.
This topic describes how to integrate Airflow with Domino by using the python-domino package.
Airflow is an open-source platform to author, schedule, and monitor pipelines of programmatic tasks. You can define pipelines with code and configure the Airflow scheduler to execute the underlying tasks. You can use the Airflow application to visualize, monitor, and troubleshoot pipelines.
If you are new to Airflow, read the Airflow QuickStart to set up your own Airflow server.
There are many options for configuring your Airflow server, and for pipelines that can run parallel tasks, you must use Airflow’s LocalExecutor mode. In this mode, you can run tasks in parallel and execute multiple dependencies at the same time. Airflow uses a database to keep records of all the tasks it executes and schedules, so you must install and configure a SQL database for LocalExecutor mode.
Read A Guide On How To Build An Airflow Server/Cluster to learn more about setting up LocalExecutor mode.
For more information about scheduling and triggers, notifications, and pipeline monitoring, read the Airflow documentation.
To create Airflow tasks that work with Domino, you must install python-domino on your Airflow workers. Use this library to add tasks in your pipeline code that interact with the Domino API to start Jobs.
Connect to your Airflow workers, and follow these steps to install and configure python-domino
:
-
Install from pip
pip install dominodatalab[airflow]
-
Set up an Airflow variable to point to the Domino host. This is the URL where you load the Domino application in your browser.
Key:
DOMINO_API_HOST
Value:<your-domino-url>
-
Set up an Airflow variable to store the user API key you want to use with Airflow. This is the user Airflow with authenticatiton to Domino to start Jobs.
Key:
DOMINO_API_KEY
Value:<your-api-key>
Airflow pipelines are defined with Python code. This fits in well with Domino’s code-first philosophy. You can use python-domino in your pipeline definitions to create tasks that start Jobs in Domino.
Architecturally, Airflow has its own server and worker nodes, and Airflow will operate as an independent service that sits outside of your Domino deployment. Airflow will need network connectivity to Domino so its workers can access the Domino API to start Jobs in your Domino project. All the code that performs the actual work in each step of the pipeline — code that fetches data, cleans data, and trains data science models — is maintained and versioned in your Domino project. This way you have Domino’s Reproducibility engine working together with Airflow’s scheduler.
The following example assumes you have an Airflow server where you want to set up a pipeline of tasks that fetches data, cleans and processes data, performs an analysis, and then generates a report. It also assumes you have all the code required to complete those tasks stored as scripts in a Domino project.
The example graph shown above is written using Airflow and python-domino and executes all the dependencies in Domino using the Airflow scheduler. It trains a model using multiple datasets and generates a final report.
See the commented script below for an example of how to configure an Airflow DAG to execute such a pipeline with Domino Jobs.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from domino import Domino
from airflow.models import Variable
# Initialize Domino API object with the api_key and host
api_key=Variable.get("DOMINO_API_KEY")
host=Variable.get("DOMINO_API_HOST")
domino = Domino("sujaym/airflow-pipeline",api_key,host)
# Parameters to DAG object
default_args = {
'owner': 'domino',
'depends_on_past': False,
'start_date': datetime(2019, 2, 7),
'retries': 1,
'retry_delay': timedelta(minutes=.5),
'end_date': datetime(2019, 2, 10),
}
# Instantiate a DAG
dag = DAG('domino_pipeline', description='Execute Airflow DAG in Domino',default_args=default_args,schedule_interval=timedelta(days=1))
# Define Task instances in Airflow to kick off Jobs in Domino
t1 = PythonOperator(task_id='get_dataset_1', python_callable=domino.runs_start_blocking, dag=dag, op_kwargs={"command":["src/data/get_dataset_1.py"]})
t2= PythonOperator(task_id='get_dataset_2', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/data/get_dataset_2.py"]}, dag=dag)
t3 = PythonOperator(task_id='get_dataset_3', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/models/get_dataset_3.sh"]}, dag=dag)
t4 = PythonOperator(task_id='clean_data', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/data/cleaning_data.py"]}, dag=dag)
t5 = PythonOperator(task_id='generate_features_1', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/features/word2vec_features.py"]}, dag=dag)
t6 = PythonOperator(task_id='run_model_1', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/models/run_model_1.py"]}, dag=dag)
t7 = PythonOperator(task_id='do_feature_engg', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/features/feature_eng.py"]}, dag=dag)
t8 = PythonOperator(task_id='run_model_2', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/models/run_model_2.py"]}, dag=dag)
t9 = PythonOperator(task_id='run_model_3', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/models/run_model_3.py"]}, dag=dag)
t10 = PythonOperator(task_id='run_final_report', python_callable=domino.runs_start_blocking, op_kwargs={"command":["src/report/report.sh"]}, dag=dag)
# Define your dependencies
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t2)
t5.set_upstream(t3)
t6.set_upstream([t4, t5])
t7.set_upstream(t4)
t8.set_upstream(t7)
t9.set_upstream(t7)
t10.set_upstream([t6, t8, t9])