Getting Started with AirFlow
What is Airflow?
Who developed Airflow?
When should you use Airflow?
How do you use Airflow?
Introduction
I am a Passionate Data freak, exploring new tools, this time I wanted to pursue pipeline orchestration hands-on and I wanted to run them as raw Python code rather any ETL tool. I came across Apache Airflow, a very easy, efficient tool for workflow management. I had few hiccups before getting started with Airflow, So I wanted to help others who are seeking guidance to work in Airflow. Lets answer the above questions one by one in short.
What is Airflow?
Airflow is an orchestration tool, so what does an orchestration tool do?It helps you arrange your programs and scripts to process in an order with or without dependencies(e.g time schedule, file availability, status of predecessors ,etc) to run a pipeline. Airflow uses DAG (Directed Acyclic Graph) to manage the workflow. What is DAG? DAG is a flow which never forms a cycle i.e it start at some point and ends at a different point and there is no going back at any point to the previous workflows (leading to infinite loop).
Who developed Airflow?
Airflow was developed by Airbnb in October 2014 for the company’s workflow management and it is an open source project maintained by Apache Foundation. It is developed using Python.
When should you use Airflow?
Airflow comes handy when you have production-grade ETL jobs that needs to be orchestrated where CRON alone is not enough because you have to set up pipeline dependencies ,event triggers, automatic retries , error handling routes and scheduled run
How do you use Airflow?
Installation step
First Dag file creation and run
Create an ETL — DAG
Installation Steps (done inMac):
This should be very similar to other OS as well.
1. Install Airflow with pippip install -U apache-airflow2. Create a folder and set it as Airflow homemkdir -p ~/airflow/dagsexport AIRFLOW_HOME='~/airflow'export PATH=$PATH:~/.local/bin3. Initialize airflow db to create metadata database(default is SQLite). After init, you can adjust airflow settings in airflow.cfgcd ~/airflowairflow initdb4. Start airflow schedulerairflow schedulerthis would start scheduler and show the logsOpen another SHELL prompt and run below5. Start airflow webserver in a "new terminal window"airflow webserverhttp://localhost:8080/admin/ -> can change this in .cfg file
Open “http://localhost:8080/admin/” in your web browser to Open the UI
Your Interactive UI should be as below with tutorial DAGs preloaded:
Congrats ! You have now installed Airflow. Let’s see how you can orchestrate your first DAG that comes pre-configured with the Airflow installation. Please refer to this link for more details on UI
Creating our first Airflow script:
The scheduling files are created as python scripts with dag objects and tasks, these are also called as Dag file, all the scheduling scripts aka DAG files should be placed under the folder “/Users/username/airflow/dags”, i.e Airflow DAG directory, created in the installation step #2 (“mkdir -p ~/airflow/dags”). Airflow scheduler picks these files and schedules your DAG.
The Airflow comes pre-configured with tutorial dags, you can turn the toggle switch (ON/OFF) to run the Dag automatically as highlighted in the box, switching this ON would schedule the script as per the DAG file configurations.
we will re-create the above DAG file (Python script given below and save as etl_dag.py) and place them in the dags folder. The below python code is already available, you can just click the tutorial DAG, it will open up in graph view(default in .cfg file), you will have list of option, click the code button to see the source code for the DAG python script, similarly you can see the source code for other DAG’s as well:
Full Code removed comments for better readability:
# etl_dag.py
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedeltadefault_args = {
'owner': 'airflow',
'depends_on_past': False, # should you look back for previous
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # how many times should we retry
'retry_delay': timedelta(minutes=5), #what is the delay period
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}dag = DAG(
'etl_dag', # this is the name of DAG in UI
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)# Method 1 to set up dependency as below:#t2.set_upstream(t1)
#t3.set_upstream(t1)# Current Practiset1 >> (t2,t3)
Lets see each section below
Import section:
# etl_dag.py
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
Import the Dag and Bash operator to create classes to run your tasks(steps that you want to run 1 by 1 like Step 1 Extract, Step2 Transfer, step 3 Load).
Default Arguments:
default_args = {
'owner': 'airflow',
'depends_on_past': False, # should you look back for previous
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # how many times should we retry
'retry_delay': timedelta(minutes=5), #what is the delay period
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
the above arguments are default for all the DAG tasks if not explicitly overriden in the tasks(listed below as t1 and t2).
Dag Object:
dag = DAG(
'etl_dag', # this is the name of DAG in UI
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))
The Dag should have the DAG id which would be displayed in the UI, scheduled interval of 1 day, attaching the link for other options.
https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html
Task1:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
A simple bash task to print the date, this bash operator can run any bash command.
Task2:
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)
Sleep for 5 seconds using bash operator.
Task3:
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
run custom code to print the paramater “params={‘my_param’: ‘Parameter I passed in’}”, Adding link for more details on other operators.
https://airflow.apache.org/docs/apache-airflow/1.10.1/concepts.html
Setting up dependencies:
t2.set_upstream(t1)
t3.set_upstream(t1)
Thats it your are done!! Yayyyyy! now t1 is the predecessor for t2 and t3,
save your script, give some time for your scheduler (initialized in step 4 in installation steps) to pick up the dag file, you should see the logs appear in case of any error messages in the shell command running scheduler. if none then your dag should appear in the UI as in figure “My first look Airflow UI”.
dag = DAG(
'etc_dag', # this is the name of DAG in UI
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))
Graph view is shown below once you click button as shown above:
Once your tasks are lined up click the trigger dag as below
Create an ETL DAG
Now you have run your first Pipeline, lets try a real-time use case to run Python scripts using python Operators and File sensor(File trigger):
Additional import statements are below, will be explained later
from airflow.operators.python_operator import PythonOperator
import sys
sys.path.append("/Users/username/airflow/scripts")
from runetl import RunEtl
from airflow.contrib.sensors.file_sensor import FileSensor
Create Dag object:
dag=DAG('etl_dag',default_args=default_args,schedule_interval='@daily',
catchup=False,template_searchpath=['/Users/user/Downloads/store sales project/sql_files','/Users/user/airflow.my_code'])
we have a template_searchpath list, which will hold the template files like sql file to be executed in case of any mysql file or any sql operator to be executed, any table to table conversion or schema creation can be done using the sql files. We are not using them here but its FYI.
Filesensor Task:
t1=FileSensor(task_id='check_file_exists',
poke_interval=5,timeout=150,filepath='/Users/username/Downloads/Airflow/input/store_commodity.csv',dag=dag)
t1 task is to look for a file and execute the subsequent steps only if the file is available. The poke_interval is the number of seconds it would wait for retry to look for the timing, timeout=150 would end the task if the expected time is not arrived, It is a best practice to always have a timeout otherwise the task would be executing in an endless loop poking every 5 seconds to look for the file.
Run the python operator:
def load_data():
load_data_to_table = RunEtl()
load_data_to_table.load_data()
t2=PythonOperator(task_id='run_my_etl',python_callable=load_data,dag=dag)
Lets see how to create an object from a class file instead of having all the ETL scripts inside my DAG python script, so I included my script folder to be concatenated with the Python script folder using the import section: import sys sys.path.append(“/Users/username/airflow/scripts”)
I created a small function load_data within the dag to create an object to call my function available inside the class file “from runetl import RunEtl”.
Note: The function is called as load_data and NOT load_data() t2=PythonOperator(task_id=’run_my_etl’,python_callable=load_data,dag=dag)
Setting up dependencies:
t1 >> t2 (t2 would run after t1)
t1 >> (t2,t3) -> example of two tasks t2,t3 to run after t1
Complete code below for easy access:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator
import sys
sys.path.append("/Users/username/airflow/scripts")
from runetl import RunEtl
from airflow.contrib.sensors.file_sensor import FileSensor
default_args = {
'owner':'Airflow',
'start_date': datetime(2021, 3, 17),
'retries':1,
'retry_delay': timedelta(seconds=5)
}
dag=DAG('etl_dag',default_args=default_args,schedule_interval='@daily',
catchup=False,template_searchpath=['/Users/username/Downloads/store sales project/sql_files','/Users/username/airflow.my_code'])
t1=FileSensor(task_id='check_file_exists',
poke_interval=5,timeout=150,filepath='/Users/username/Downloads/Airflow/input/store_commodity.csv',dag=dag)
def load_data():
load_data_to_table = RunEtl()
load_data_to_table.load_data()
t2=PythonOperator(task_id='clean_raw_csv',python_callable=load_data,dag=dag)
t1 >> t2
Conclusion
There are more options that you can try and different sensors, hooks as per your uses case, but the above explained should get you started with simple pipelines. Hope you enjoyed and let me know if you would like to me share my learnings on any other topic.