Effizient Entwickeln für Apache Airflow

Apache Airflow ist die führende Plattform, um Daten-Workflows in Python zu schreiben und zu orchestrieren. Es bietet zuverlässiges Scheduling für Python Tasks und liefert vorgefertigte Integrationen und Verbindungs-Management für bekannte Datenbanken, Data Warehouses, Cloud-Dienstleister und mehr bereits mit.
Die Tasks zu schreiben ist an sich unkompliziert. Das lokale Ausführen, Testen und Debuggen von Airflow Tasks gestaltet sich aber bekanntlich herausfordernd, hauptsächlich weil dies in einer Airflow Instanz stattfinden muss. Wir zeigen hier Wege auf, wie man für Airflow effizient entwickeln kann.
Entwicklungsumgebung Starten
Airlaunch hat ein Open Source Tool entwickelt, welches das Erstellen und Managen von Airflow Entwicklungs-Umgebungen erleichtert. Der Code ist auf GitHub verfügbar.
Das Airlaunch Tool ist eine Wrapper um Apache Airflow. Es erweitert dieses um einige Befehle, die die Verwaltung von Airflow Umgebungen automatisieren.
Als ersten Schritt installieren wir das Airlaunch Tool:
sudo wget https://raw.githubusercontent.com/airlaunch-ch/airlaunch-cli/master/air -O /usr/local/bin/air && sudo chmod +x /usr/local/bin/air
Die Verwaltungsbefehle sind in der Command Group air env
abrufbar. Eine Übersicht über die vorhandenen Funktionen kann mit air env -h
angezeigt werden.
Um eine Entwicklungsumgebung zu initialisieren, reicht ein einfacher Befehl:
air env init
Der Befehl erstellt eine python virtualenv, installiert Apache Airflow unter Berücksichtigung aller Dependency-Constraints und initialisiert Apache Airflow mit einer für die Entwicklung geeigneten Konfiguration.
Airflow kann dann mit air env start
gestartet werden.
Das Airflow Webinterface ist nun auf localhost:8080 verfügbar und kann beispielsweise dafür genutzt werden, um Airflow Connections zu erfassen (siehe unten). Für die Entwicklung der DAGs ist es aber nicht unbedingt nötig, den Webserver zu starten.
Airflow Connections Erfassen
Airflow verwendet sogenannte Connections, um Credentials für Datenquellen und -Ziele zu verwalten. Die Airflow Dokumentation zeigt detailliert auf, wie diese verwaltet werden können. Diese Funktionen können mit dem Airlaunch Tool vollumfänglich verwendet werden.
Es gibt grundsätzlich drei Arten, Airflow Connections zu konfigurieren:
- Starten des Webservers:
air env start
ausführen und via Webserver unter admin-> connections die Credentials erfassen - Nutzung der Airflow CLI:
Sämtliche in der Airflow Dokumentation beschriebenen Befehle funktionieren auch mit dem Airlaunch Tool. Dazu einfach denairflow
command durchair
ersetzen, der Rest bleibt gleich. Zum Erfassen einer neuen Connection alsoair connections add [...]
- Importieren eines .YAML files:
Es ist möglich, Connections von einem .yaml zu importieren. Das Airlaunch Tool importiert Connections, die im File connections.yaml im DAG folder sind, automatisch, wenn der Server gestartet wird. Der Import kann mitair env load
auch manuell ausgelöst werden.
Wurden bereits Airflow Connections konfiguriert, können diese mit dem Befehl air env export
in ein yaml file exportiert und später mit air env load
wieder importiert werden.
Dependencies Installieren
Häufig greifen Python Tasks auf Libraries zurück, die dafür zuerst in der Umgebung installiert werden müssen. Bei der Installation von Dependencies muss sichergestellt werden, dass keine Konflikte zwischen den Airflow-eigenen und den neu zu installierenden Packages entstehen. Das Airflow Tool verhindert solche Konflikte automatisch.
PyPI Packages können mit air env install [packagename]
installiert werden.
Weiter ist es möglich, mit air env install-requirements [requirements file]
Dependencies von einem requirements.txt file zu laden. Wird kein File explizit angegeben, wird nach einem requirements.txt
File im DAG Folder gesucht.
DAGs Entwickeln
In Airflow werden Tasks (und deren Reihenfolge) in DAGs abgebildet. Für die Entwicklung der DAGs verwenden wir VSCode. Es kann aber jeder beliebige Code Editor verwendet werden.
VSCode erkennt in der Regel automatisch, dass eine virtualenv vorhanden ist und wählt den richtigen Python Interpreter aus. Sollte das nicht der Fall sein, müssen die folgenden Schritte ausgeführt werden:
ctrl+shift+p -> Python: Select Interpreter -> (venv:venv)


So stehen alle Annehmlichkeiten einer IDE zur Verfügung, inkl. Code Autocompletion und Linting.
Nun kann ein DAG erstellt werden. Als Beispiel soll hier ein einfacher DAG mit einem Dummy-Task und einem einfachen Python-Task verwendet werden. Erstelle dazu ein File 'hello_dag.py' im Root-Verzeichnis oder in einem separaten Ordner.
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def hello_world():
print("hello world!")
args = {
'owner': 'airlaunch',
}
dag = DAG(
dag_id='hello_dag',
default_args=args,
schedule_interval='0 0 * * *',
start_date=days_ago(2),
)
dummy_task = DummyOperator(
task_id='dummy_task',
dag=dag,
)
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=hello_world,
)
dummy_task >> python_task
if __name__ == "__main__":
dag.cli()
Zu beachten sind die beiden letzten Zeilen. Sie erlauben, das DAG File direkt auszuführen, was das Debugging vereinfacht. Mehr dazu weiter unten.
DAGs Testen
Wir können DAGs direkt in der Command-Line testen und müssen nicht den (langsameren) Umweg über das Webinterface gehen.
Dazu stehen uns alle Befehle der Apache Airflow CLI zur Verfügung. Das Einzige, was wir machen müssen, ist den airflow
Befehl durch air
zu ersetzen. So wird automatisch die vorhin installierte & konfigurierte Umgebung verwendet.
Ein DAG kann dann einfach mit dem folgenden Befehl getestet werden (Das Datum zum Schluss ist das Execution Date):
air dags test hello_dag 2022-01-01
Es ist auch möglich, einzelne Tasks zu testen ohne den gesamten DAG ausführen zu müssen. Um beispielsweise nur den zweiten Python Task des DAGs auszuführen, kann der folgende Befehl verwendet werden:
air tasks test hello_dag python_task 2022-01-01
Mit diesen Befehlen kann man effizient Airflow DAGs entwickeln und muss nicht zeitraubend ständig das Airflow Web Interface dazu verwenden.
DAGs Debuggen
Um den Python Debugger zu nutzen und Fehler effizient zu finden, muss im Falle von VSCode eine Launch Configuration erstellt werden. Füge dazu im .vscode Ordner deines Workspaces ein File launch.json mit folgendem Inhalt hinzu. Allenfalls musst der Ordner noch erstellt werden. Alternativ kann man im VSCode GUI auch unter Run and Debug eine neue Run Configuration für ein Python File erstellen und diese dann editieren.
{
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"args": ["tasks", "test","python_task","2020-01-01"]
}
]
}
Die Konfiguration ist weitgehend die Standard Python Launch Konfiguration von VSCode. Hinzu kommt lediglich die args
Zeile. Mit den ersten beiden Argumenten lösen wir einen Task Run aus, Argument drei und vier definieren den Task und die Execution Time.

Das ist es schon! Wir können nun einen beliebigen Breakpoint setzen, den Task mit F5
ausführen und Zeile für Zeile das Skript durchgehen.
Nächste Schritte
Deployen Sie Ihre Airflow DAGs in wenigen Schritten mit unserer Managed Airflow Lösung. No Ops und Autoskalierend.
Gerne unterstützen wir Sie beratend dabei, Ihre Daten kontinuierlich und konsequent zu nutzen. Kontaktieren Sie uns für ein unverbindliches Erstgespräch oder ein kostenloses Data Assessment.