In drei Schritten zur industriellen Prozessüberwachung
Die Industrielle Datennutzung birgt grosses Potential für die Prozessüberwachung oder -Optimierung, aber auch für komplexere Anwendungen wie predictive Maintenance oder die automatisierte Anomaly Detection. Wir zeigen an einem Beispiel auf, wie die Grundlagen für diese Anwendungen auf einfache Weise geschaffen werden können.
Ausgangslage
Im Bergbau wird Flotation als Aufbereitungsverfahren eingesetzt, um Erz von taubem Gestein zu trennen. Dazu wird in mehreren Stufen in sogenannten Flotation Zellen Luft durch ein Gemisch aus Wasser und Gestein geleitet, um die metallischen Anteile im Gemisch zu binden. So wird ein Konzentrat mit höherem Eisengehalt hergestellt.
Entlang dieses kontinuierlichen Prozesses fallen an verschiedenen Punkten Daten an, die zur Analyse in einem Datawarehouse zusammengeführt werden sollen. Dies ermöglicht dann eine zentrale Prozessübersicht, die alle wichtigen Prozessdaten auf einen Blick bereitstellt. So behält man automatisch den Überblick, kann datenbasiert Entscheidungen treffen und das aufwändige Zusammenführen der Daten entfällt.

Um dies zu ermöglichen, wird mithilfe der Airlaunch Plattform eine Extract, Transform, Load (ETL Pipeline) erstellt, welche die Daten regelmässig bei den Datenquellen abholt und in das zentrale Datawarehouse schreibt.
Herausforderung
Im vorliegenden Fall müssen drei Herausforderungen gemeistert werden, die im industriellen Umfeld häufig zu finden sind und einer effiziente Nutzung der Daten im Weg stehen
- Die Daten fallen in verschiedenen Formaten an und erfordern deshalb massgeschneiderte Lösungen, um diese zu vereinheitlichen und in das vorgesehene Datawarehouse zu laden. Vorgefertigte Lösungen bieten hier häufig zu wenig Flexibilität.
- Die Daten fallen On Premise an, was eine Nutzung von reinen Cloud Angeboten verhindert.
- Der Betrieb einer eigenen On Premise Lösung führt oft zu hohen Kosten.
Lösung
Die Airlaunch Plattform bietet eine hybride Cloud/On Premise Lösung, welche diesen Herausforderungen eine einfache Lösung gegenüberstellt.
- Volle Power von Python
Die Plattform basiert auf Apache Airflow, dem Open Source Industrie-Standard für die Orchestrierung von Python Workflows. So vereint Airflow "off-the-shelf" Lösungen für Standardaufgaben mit der vollen Flexibilität der Python Programmiersprache. - No Ops dank Cloud
Airlaunch Managed Airflow bietet eine skalierbare Airflow Instanz nach einem Pay-per-Use Modell in der Cloud und hält so die Kosten tief. - Sichere Integration der On-Premise Daten
Der Airflow Local Worker stellt den Zugang zu On-Premise Daten sicher ohne dass Kompromisse bei der Netzwerksicherheit gemacht werden. Er verbindet sich mit der Cloud-Plattform und wird von dieser überwacht und gesteuert und minimiert so die operativen Kosten. Die Kommunikation geht dabei immer vom lokalen Worker aus, niemals umgekehrt (outgoing only connection), es ist also keine Öffnung des lokalen Netzwerkes von aussen nötig.
Prozessübersicht
Aus Sicht der Datenanalyse kann der Prozess in vier Schritte unterteilt werden. Bei jedem dieser Schritte fallen Daten an, die zentralisiert gespeichert werden sollen. Zusätzlich werden von den Operatoren Zwischenfälle und Störungen erfasst, die Einfluss auf den Prozess haben.

- Im Labor wird stündlich die Konzentration des Eisenerzes sowie der Unreinheiten im Ausgangsmaterial gemessen.
- Ein Sensor misst laufend den Materialfluss in die Flotation-Zellen.
- Operatoren erfassen Zwischenfälle und Störungen, welche über alle Prozessschritte gespeichert werden.
- Die Maschinensteuerung der sieben Flotation-Zellen überwacht laufend das Flüssigkeitslevel sowie den Luftstrom.
- Stündliche Labormessungen ermitteln die Konzentration des Eisenerzes sowie der Unreinheiten im produzierten Konzentrat.
Architektur
Um eine Gesamtübersicht dieser Daten zu erhalten, müssen diese zentral in einem Datawarehouse gespeichert werden und mittels eines Business Intelligence Tools visualisiert werden. Um dies zu erreichen nutzen wir die folgende Infrastruktur:
- Airlaunch Managed Apache Airflow zur Orchestrierung der ETL Pipeline.
- Airlaunch Local Worker um die On-Premise Daten zu laden und ins Datawarehouse zu schreiben.
- Ein Datawarehouse zur zentralen Speicherung der Daten. Apache Airflow unterstützt alle gängigen Datenbanken und Warehouse Lösungen "out of the box". In unserem Fall schreiben wir die Daten in eine einfache Postgres Datenbank.
- Business Intelligence Software der Wahl für die Visualisierung der Daten. In diesem Beispiel haben wir die Open Source Lösung Apache Superset verwendet.

Implementierung
1. Schritt: Definition ETL Pipeline
Die Architektur zeigt, dass wir für die Data Extraction drei verschiedene Technologien verwenden müssen:
- Die Labordaten sowie die Incident-Reports liegen als Excel File vor und müssen via FTP Server abgeholt werden.
- Die Sensordaten werden von einem externen System in eine Datenbank geschrieben und können dort ausgelesen werden.
- Die Flotation-Zellen verwendet wie viele Produktionsanlagen OPC UA.
OPC UA Daten auslesen
Im Industriellen Umfeld ist OPC UA weit verbreitet. Aus diesem Grund hat Airlaunch einen OPC UA Operator für Airflow geschrieben, der das Auslesen von OPC UA Daten erleichtert. Wir müssen also keine separate Funktion dafür schrieben.
Der Operator übernimmt die folgenden Aufgaben:
- Laden der History des OPC UA Node einer Flotation-Zelle im relevanten Zeitraum.
- Hinzufügen des Namens der Flotation Zelle zur Tabelle.
- Schrieben der Daten ins Datawarehouse.
Einlesen der Excel Daten
Um die Daten der Excel Files einzulesen, haben wir eine kleine Python Funktion geschrieben. Sie erledigt folgende Aufgaben für uns:
- Herunterladen des Excel Files vom FTP Server
- Einlesen des Excel Files in einen Pandas Dataframe
- Filtern nach den relevanten Zeitstempeln
- Umbenennen der Spalten der Tabelle in das von uns gewünschte Format
- Schreiben der Daten ins Datawarehouse
Da die Funktion allgemein gehalten ist, kann sie für alle Daten verwendet werden, die via FTP Server abgerufen werden müssen.
def extract_xlsx_from_ftp(postgres_conn_id: str, postgres_table: str, ftp_conn_id: str, remote_full_path: str, sheet_name: str, column_names: list, dtype: dict, start_time: str, end_time: str):
ftp_hook = FTPHook(
ftp_conn_id
)
buff = BytesIO()
ftp_hook.retrieve_file(
remote_full_path=remote_full_path,
local_full_path_or_buffer=buff
)
excel_object = pd.ExcelFile(buff, engine='openpyxl')
df = excel_object.parse(sheet_name = sheet_name, index_col = 0)
df = df[(df.index >= start_time) & (df.index < end_time)]
df.reset_index(drop=False, inplace=True)
df.columns = column_names
pg_hook = PostgresHook(postgres_conn_id=postgres_conn_id)
df.to_sql(postgres_table, pg_hook.get_sqlalchemy_engine(), if_exists='append', index=False, dtype=dtype)
Postgres Daten auslesen
Im Falle der Sensordaten, die in einer Postgres Datenbank vorliegen, gehen wir ähnlich vor. Wieder erstellen wir eine kleine Python Funktion, die uns die Daten von der Datenquelle liest und in unser Datawarehouse schreibt:
def extract_postgres_table(source_postgres_conn_id: str, source_table: str, dest_postgres_conn_id: str, dest_table: str, start_time: str, end_time: str):
source_hook = PostgresHook(source_postgres_conn_id)
destination_hook = PostgresHook(dest_postgres_conn_id)
records = source_hook.get_pandas_df("""SELECT * FROM "{}" WHERE datetime >= '{}' AND datetime < '{}'""".format(source_table, start_time, end_time))
records.to_sql(dest_table, destination_hook.get_sqlalchemy_engine(), if_exists='append', index=False)
2. Schritt: Automatisierung
Um die oben beschriebenen Schritte nun zu automatisieren, konfigurieren wir einen Airflow DAG, der dafür sorgt, dass die Tasks regelmässig ausgeführt werden und dass jeweils der richtige Zeitabschnitt der Daten ausgelesen wird.
Den DAG erstellen wir in Airflow indem wir einfach ein DAG Objekt erstellen.
with DAG(
'opc',
default_args=default_args,
description='Extract Production Data',
schedule_interval=timedelta(minutes=15),
start_date=datetime(2021, 12, 11, 0, 0, 0),
catchup=True,
) as dag:
# Definitionen der einzelnen Tasks
Wir haben hier definiert, dass die ETL Pipeline alle 15 Minuten ausgeführt wird. So haben wir immer die neusten Daten in unserem Datawarehouse. Mit 'catchup=True' stellen wir sicher, dass ein Run nachgeholt wird, sollte mal ein Zeitintervall verpasst werden. So stellen wir sicher, dass wir keine Lücken in unserem Warehouse haben.
Automatisieren der Python Funktionen
Die Python Funktionen (für das Auslesen der Postgres Daten sowie für das Einlesen der Excel Tabellen) können wir mittels des Airflow Python Operators automatisieren.
Dem Python Operator übergeben wir die Python Funktion sowie eine Liste der Argument, mit der die Funktion ausgeführt werden soll.
fetchInputQuality = PythonOperator(
task_id='extract_input_quality',
python_callable=extract_xlsx_from_ftp
queue='local',
op_kwargs={
"postgres_table": "input_quality",
"column_names": ['datetime', 'percent_iron_feed', 'percent_silica_feed'],
"dtype": {"datetime": DateTime, "percent_iron_feed": Float, "percent_silica_feed": Float},
"ftp_conn_id": "ftp_lab",
"remote_full_path": "/data/input_quality_shifted.xlsx",
"sheet_name": 'Sheet1',
"start_time": "{{ data_interval_start.to_datetime_string() }}",
"end_time": "{{ data_interval_end.to_datetime_string() }}",
"postgres_conn_id": "postgres_warehouse"
}
)
In den Argumenten wird definiert, welches Excel File eingelesen werden soll und wie es in die Postgres Datenbank geschrieben werden soll.
Zu beachten ist das Argument "queue=local". Damit definieren wir, dass dieser Task On-Premise durch den lokalen Worker ausgeführt werden soll. Die Zuteilung und Überwachung dieses lokalen Tasks übernimmt Airflow für uns!
Interessant sind weiter die Argumente 'start_time' und 'end_time'. Hier verwenden wir Airflow Makros, die zur Laufzeit ersetzt werden. So ist sichergestellt, dass immer das korrekte Zeitintervall eingelesen wird und wir keine doppelten Daten transferieren. Wir müssen uns also in der Funktion selber nicht darum kümmern, wie gross das Zeitintervall ist oder wann die Funktion genau ausgeführt wird. Airflow übernimmt das für uns!
Automatisieren der OPC UA Data Extraction
Für die Automatisierung der OPC Data Extraction verwenden wir den OPCUAToPostgresOperator von Airlaunch. Da wir Daten von sieben Flotation Zellen auslesen, erstellen wir die Tasks in einem einfachen Loop:
for column in floationColumns:
extractColumnLevel = OPCUAToPostgresOperator(
task_id='extract_opc_column_level_{}'.format(column["colNo"]),
queue='local',
opcua_node="ns=0;i={}".format(column["levelNodeId"]),
opcua_conn_id="opc",
if_exists='append',
opcua_startdate="{{ data_interval_start.to_datetime_string() }}",
opcua_enddate="{{ data_interval_end.to_datetime_string() }}",
postgres_table='column_level',
static_columns={
'column': column["colNo"]
},
postgres_conn_id="postgres_warehouse",
)
In diesem Loop wird nun für jede Zelle ein Task definiert, der den Wasserstand in der relevanten Zeitperiode vom OPC UA Server einliest und in das Datawarehouse schreibt. Wieder verwenden wir die Airflow Makros um die Zeitperiode zu definieren (opcua_startdate und opcua_enddate) und weisen Airflow an, diesen Task On-Premise auszuführen (queue='local').
Das ist es auch schon! Wir haben mit wenigen Zeilen Code eine hochflexible ETL Pipeline erstellt, die uns regelmässig Daten in unser Datawarehouse Lädt!
3. Schritt: Visualisierung der Daten
Mit Hilfe eines Business Intelligence Tools können wir nun die Früchte der Arbeit ernten, und eine zentrale Übersicht über unsere Prozessdaten erstellen. Airlaunch baut auf Open Source, wir verwenden also Apache Superset.
In ein paar wenigen Schritten erstellen wir ein Visualisierung der Daten. So sehen wir auf einen Blick Daten von den verschiedenen Datenquellen und können sämtliche wichtigen Informationen entlang des Produktionsprozesses wie das Produktionsvolumen und die Produktqualität zentral darstellen. Dies erlaubt nun eine zentrale Übersicht der wichtigsten Prozess KPIs, welche laufend und automatisch auf dem neusten Stand gehalten werden.

Eine weitere Stärke ergibt sich aus der Kombination verschiedener Datenquellen. Im vorliegenden Fall können wir Incidents mit dem Produktionsvolumen verbinden und so direkt sehen, wie sich die Zwischenfälle auf die Produktion auswirken. Diese Informationen können dann genutzt werden, um Datenbasiert die richtigen Massnahmen zu ergreifen.

Diese Visualisierungen zeigen auf, was bereits mit einfachen Mitteln erreicht werden kann. Mit dem Datawarehousing ist auf dem Weg zu Datenbasierten Entscheidungen schon viel erreicht. Eine solche Datenplattform eröffnet nun ein ganzes Feld von Möglichkeiten für die Weitere Datennutzung wie dem Einsatz von künstlicher Intelligenz, beispielsweise für Predictive Maintenance, automatisierter Outlier Detection oder der Voraussage der erwarteten Produktqualität.
Nächste Schritte
Gerne unterstützen wir Sie dabei, Ihre Daten kontinuierlich und konsequent zu nutzen. Kontaktieren Sie uns für ein unverbindliches Erstgespräch oder ein kostenloses Data Assessment!