How to develop efficiently for Apache Airflow

How to develop efficiently for Apache Airflow

Apache Airflow is the leading platform for writing and orchestrating data workflows in Python. It provides reliable scheduling for Python tasks and comes with pre-built integrations and connection management for popular databases, data warehouses, cloud service providers and more.

Writing the tasks is straightforward. However, running, testing and debugging Airflow tasks locally is notoriously challenging, mainly because it has to be done in an Airflow instance. We show ways to develop efficiently for Airflow.

Setting up a development environment

Airlaunch has developed an open source tool that facilitates the creation and management of Airflow development environments. The code is available on GitHub.

The Airlaunch tool basically is a thin wrapper around Apache Airflow. It extends the airflow cli with commands that automate the management of Airflow environments.

As a first step, we install the Airlaunch Tool:

sudo wget https://raw.githubusercontent.com/airlaunch-ch/airlaunch-cli/master/air -O /usr/local/bin/air && sudo chmod +x /usr/local/bin/air

Now, the management commands can are available in the air env command group.
An overview of the available functions can be displayed with air env -h.

With the following simple command, you can initialize a development environment:

air env init

The command creates a python virtualenv, installs Apache Airflow taking into account all dependency constraints, and initializes Apache Airflow with a configuration suitable for development.

Now, the environment can be started with the command air env start.

After the startup, the Airflow web interface is available on localhost:8080 and can be used for to register Airflow Connections (see below). For the development of the DAGs, however, it is not necessary to start the web server.

Register Airflow Connections

Airflow uses the concept of connections to manage credentials for data sources and destinations. The Airflow documentation shows in detail how these can be managed. These functions can be used with the Airlaunch tool.

There are basically three ways to configure Airflow Connections which we explain below:

  1. Start the Webserver: Run air env start and enter the credentials under admin->connections
  2. Use the airflow cli: All commands described in the Airflow documentation also work with the Airlaunch Tool. Just replace the airflow command with air, the rest remains the same. To create a new connection, run air connections add [...].
  3. Import a .YAML file: With the airlaunch tool, it is possible to import connections from a .yaml. By default, the tool looks for a connections.yaml file in the DAG folder. Connections stored there are imported on server startup or when air env load is run.

If Airflow connections have already been configured, they can be exported to a yaml file with the air env export command and imported again later with air env load.

Install Python Dependencies

Python tasks often rely on libraries that must first be installed in the environment. When installing dependencies, it must be ensured that no conflicts arise between Airflow's own packages and the new packages to be installed. The Airflow tool prevents such conflicts automatically.

To install any PyPI package, run air env install [packagename]

It is also possible to load dependencies from a requirements.txt file by running air env install-requirements [/path/to/requirements/file]. If no file is explicitly specified, the tool looks for a requirements.txt file in the DAG folder.

Develop DAGs

In Airflow, tasks (and their sequence) are mapped into DAGs. Here, we show how to develop and test them using VSCode. However, any code editor can be used.

VSCode usually detects automatically that a virtualenv is present in the current folder and selects the correct Python interpreter. If this is not the case, the following steps must be performed:

ctrl+shift+p ->  Python: Select Interpreter -> (venv:venv)

Now, all the conveniences of an IDE are available, including code autocompletion and linting.

Now a DAG can be created. As an example, a simple DAG with a dummy task and a simple Python task will be used here. Create a file 'hello_dag.py' in the root directory or in a separate folder.

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def hello_world():
    print("hello world!")

args = {
    'owner': 'airlaunch',
}

dag = DAG(
    dag_id='hello_dag',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
)

dummy_task = DummyOperator(
    task_id='dummy_task',
    dag=dag,
)

python_task = PythonOperator(
    task_id='python_task',
    dag=dag,
    python_callable=hello_world,
)

dummy_task >> python_task

if __name__ == "__main__":
    dag.cli()

Note the last two lines. They allow to execute the DAG file directly, which simplifies debugging. More about this below.

Testing DAGs

We can test DAGs directly in the command line and do not need to take the (slower) detour via the web interface.

For this we have all the commands of the Apache Airflow CLI at our disposal. The only thing we need to do is to replace the airflow command with the air. This will automatically actiovate and use the previously installed & configured environment.

A DAG can then be tested easily with the following command (The date at the end is the execution date):

air dags test hello_dag 2022-01-01

It is also possible to test individual tasks without having to run the entire DAG. For example, to run only the second Python task of the DAG, the following command can be used:

air tasks test hello_dag python_task 2022-01-01

With these commands, you can efficiently develop Airflow DAGs and don't have to spend time constantly using the Airflow Web Interface to do so.

Debugging DAGs

In order to use the Python debugger and find bugs efficiently, we need to create a VSCode launch configuration must be created.

To do so, add a file named launch.json in the .vscode folder of your workspace paste the following content:

{
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal",
            "args": ["tasks", "test","python_task","2020-01-01"]
        }
    ]
}

The configuration largely is the standard Python launch configuration of VSCode. Only the args line is added. With the first two arguments we trigger a task run, argument three and four define the task and the execution time.

That's it! We can now set any breakpoint, execute the task by pressing the F5 key and go through the script line by line.

Next Steps

Deploy your Airflow DAGs in a few steps with our Managed Airflow solution. No Ops and Autoscaling enabled.

We are happy to support you on your journey to use your data continuously and consistently. Contact us for a free initial consultation or a free data assessment.