Getting Started with AirFlow

Rahulsunder
8 min readApr 8, 2021
from wikipedia

What is Airflow?

Who developed Airflow?

When should you use Airflow?

How do you use Airflow?

Introduction

I am a Passionate Data freak, exploring new tools, this time I wanted to pursue pipeline orchestration hands-on and I wanted to run them as raw Python code rather any ETL tool. I came across Apache Airflow, a very easy, efficient tool for workflow management. I had few hiccups before getting started with Airflow, So I wanted to help others who are seeking guidance to work in Airflow. Lets answer the above questions one by one in short.

What is Airflow?

Airflow is an orchestration tool, so what does an orchestration tool do?It helps you arrange your programs and scripts to process in an order with or without dependencies(e.g time schedule, file availability, status of predecessors ,etc) to run a pipeline. Airflow uses DAG (Directed Acyclic Graph) to manage the workflow. What is DAG? DAG is a flow which never forms a cycle i.e it start at some point and ends at a different point and there is no going back at any point to the previous workflows (leading to infinite loop).

Who developed Airflow?

Airflow was developed by Airbnb in October 2014 for the company’s workflow management and it is an open source project maintained by Apache Foundation. It is developed using Python.

When should you use Airflow?

Airflow comes handy when you have production-grade ETL jobs that needs to be orchestrated where CRON alone is not enough because you have to set up pipeline dependencies ,event triggers, automatic retries , error handling routes and scheduled run

How do you use Airflow?

Installation step

First Dag file creation and run

Create an ETL — DAG

Installation Steps (done inMac):

This should be very similar to other OS as well.

1. Install Airflow with pippip install -U apache-airflow2. Create a folder and set it as Airflow homemkdir -p ~/airflow/dagsexport AIRFLOW_HOME='~/airflow'export PATH=$PATH:~/.local/bin3. Initialize airflow db to create metadata database(default is SQLite). After init, you can adjust airflow settings in airflow.cfgcd ~/airflowairflow initdb4. Start airflow schedulerairflow schedulerthis would start scheduler and show the logsOpen another SHELL prompt and run below5. Start airflow webserver in a "new terminal window"airflow webserverhttp://localhost:8080/admin/ -> can change this in .cfg file

Open “http://localhost:8080/admin/” in your web browser to Open the UI

Your Interactive UI should be as below with tutorial DAGs preloaded:

My first look Airflow UI

Congrats ! You have now installed Airflow. Let’s see how you can orchestrate your first DAG that comes pre-configured with the Airflow installation. Please refer to this link for more details on UI

Creating our first Airflow script:

The scheduling files are created as python scripts with dag objects and tasks, these are also called as Dag file, all the scheduling scripts aka DAG files should be placed under the folder “/Users/username/airflow/dags”, i.e Airflow DAG directory, created in the installation step #2 (“mkdir -p ~/airflow/dags”). Airflow scheduler picks these files and schedules your DAG.

The Airflow comes pre-configured with tutorial dags, you can turn the toggle switch (ON/OFF) to run the Dag automatically as highlighted in the box, switching this ON would schedule the script as per the DAG file configurations.

Tutorial DAG in UI, Toggle switch

we will re-create the above DAG file (Python script given below and save as etl_dag.py) and place them in the dags folder. The below python code is already available, you can just click the tutorial DAG, it will open up in graph view(default in .cfg file), you will have list of option, click the code button to see the source code for the DAG python script, similarly you can see the source code for other DAG’s as well:

Open source code for a DAG from UI

Full Code removed comments for better readability:

# etl_dag.py
import
airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False, # should you look back for previous
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # how many times should we retry
'retry_delay': timedelta(minutes=5), #what is the delay period

# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
dag = DAG(
'etl_dag', # this is the name of DAG in UI
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
# Method 1 to set up dependency as below:#t2.set_upstream(t1)
#t3.set_upstream(t1)
# Current Practiset1 >> (t2,t3)

Lets see each section below

Import section:

# etl_dag.py
import
airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta

Import the Dag and Bash operator to create classes to run your tasks(steps that you want to run 1 by 1 like Step 1 Extract, Step2 Transfer, step 3 Load).

Default Arguments:

default_args = {
'owner': 'airflow',
'depends_on_past': False, # should you look back for previous
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # how many times should we retry
'retry_delay': timedelta(minutes=5), #what is the delay period
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}

the above arguments are default for all the DAG tasks if not explicitly overriden in the tasks(listed below as t1 and t2).

Dag Object:

dag = DAG(
'etl_dag', # this is the name of DAG in UI
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))

The Dag should have the DAG id which would be displayed in the UI, scheduled interval of 1 day, attaching the link for other options.

https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html

Task1:

t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

A simple bash task to print the date, this bash operator can run any bash command.

Task2:

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)

Sleep for 5 seconds using bash operator.

Task3:

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)

run custom code to print the paramater “params={‘my_param’: ‘Parameter I passed in’}”, Adding link for more details on other operators.

https://airflow.apache.org/docs/apache-airflow/1.10.1/concepts.html

Setting up dependencies:

t2.set_upstream(t1)
t3.set_upstream(t1)

Thats it your are done!! Yayyyyy! now t1 is the predecessor for t2 and t3,

save your script, give some time for your scheduler (initialized in step 4 in installation steps) to pick up the dag file, you should see the logs appear in case of any error messages in the shell command running scheduler. if none then your dag should appear in the UI as in figure “My first look Airflow UI”.

dag = DAG(
'etc_dag', # this is the name of DAG in UI
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))
Click Graph view in your tutorial DAG

Graph view is shown below once you click button as shown above:

Graph view shows Task relationship

Once your tasks are lined up click the trigger dag as below

Run the “trigger Dag” button

Create an ETL DAG

Now you have run your first Pipeline, lets try a real-time use case to run Python scripts using python Operators and File sensor(File trigger):

Additional import statements are below, will be explained later

from airflow.operators.python_operator import PythonOperator
import sys
sys.path.append("/Users/username/airflow/scripts")
from runetl import RunEtl
from airflow.contrib.sensors.file_sensor import FileSensor

Create Dag object:

dag=DAG('etl_dag',default_args=default_args,schedule_interval='@daily',
catchup=False,template_searchpath=['/Users/user/Downloads/store sales project/sql_files','/Users/user/airflow.my_code'])

we have a template_searchpath list, which will hold the template files like sql file to be executed in case of any mysql file or any sql operator to be executed, any table to table conversion or schema creation can be done using the sql files. We are not using them here but its FYI.

Filesensor Task:

t1=FileSensor(task_id='check_file_exists',
poke_interval=5,timeout=150,filepath='/Users/username/Downloads/Airflow/input/store_commodity.csv',dag=dag)

t1 task is to look for a file and execute the subsequent steps only if the file is available. The poke_interval is the number of seconds it would wait for retry to look for the timing, timeout=150 would end the task if the expected time is not arrived, It is a best practice to always have a timeout otherwise the task would be executing in an endless loop poking every 5 seconds to look for the file.

Run the python operator:

def load_data():
load_data_to_table = RunEtl()
load_data_to_table.load_data()

t2=PythonOperator(task_id='run_my_etl',python_callable=load_data,dag=dag)

Lets see how to create an object from a class file instead of having all the ETL scripts inside my DAG python script, so I included my script folder to be concatenated with the Python script folder using the import section: import sys sys.path.append(“/Users/username/airflow/scripts”)

I created a small function load_data within the dag to create an object to call my function available inside the class file “from runetl import RunEtl”.

Note: The function is called as load_data and NOT load_data() t2=PythonOperator(task_id=’run_my_etl’,python_callable=load_data,dag=dag)

Setting up dependencies:

t1 >> t2 (t2 would run after t1)

t1 >> (t2,t3) -> example of two tasks t2,t3 to run after t1

Complete code below for easy access:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator
import sys
sys.path.append("/Users/username/airflow/scripts")
from runetl import RunEtl
from airflow.contrib.sensors.file_sensor import FileSensor

default_args = {
'owner':'Airflow',
'start_date': datetime(2021, 3, 17),
'retries':1,
'retry_delay': timedelta(seconds=5)
}

dag=DAG('etl_dag',default_args=default_args,schedule_interval='@daily',
catchup=False,template_searchpath=['/Users/username/Downloads/store sales project/sql_files','/Users/username/airflow.my_code'])

t1=FileSensor(task_id='check_file_exists',
poke_interval=5,timeout=150,filepath='/Users/username/Downloads/Airflow/input/store_commodity.csv',dag=dag)

def load_data():
load_data_to_table = RunEtl()
load_data_to_table.load_data()

t2=PythonOperator(task_id='clean_raw_csv',python_callable=load_data,dag=dag)



t1 >> t2

Conclusion

There are more options that you can try and different sensors, hooks as per your uses case, but the above explained should get you started with simple pipelines. Hope you enjoyed and let me know if you would like to me share my learnings on any other topic.

--

--