Three steps to automated industrial process monitoring

Industrial data holds great potential for process monitoring or optimization, but also for more complex applications such as predictive maintenance or automated anomaly detection. We show with an example how the foundations for these applications can be created in a simple way.
Starting Point
In mining, flotation is used as a processing method to separate ore from waste rock. For this purpose, so-called flotation cells are used where air is passed through a mixture of water and rock in order to bind the metallic components in the mixture. This produces a concentrate with a higher iron content.
Valuable data is produced at various points along this continuous process. We show how this data can be collected and stored in a data warehouse for analysis.
This then enables a central process overview of all relevant data at a glance, as shown by the example below.
This eliminates the need for time-consuming manual data merging and enables enables data driven decision making. Furthermore, the central availability of all relevant data enables more advanced data usage like the training of AI models.

To make this possible, an Extract, Transform, Load (ETL) pipeline is created using the Airlaunch platform, which regularly fetches the data from the data sources and writes it to the central data warehouse.
Challenges
In the present case, three main challenges frequently found in an industrial environment must be overcome and stand in the way of efficient use of the data:
- The data is produced in various formats and therefore requires customized solutions to standardize it and load it into the data warehouse. Out-of-the-box solutions often offer not enough flexibility to achieve this efficiently.
- Data is produced on premise, which prevents the use of cloud-only offerings.
- Operating an on premise solution often leads to high costs.
Solution
The Airlaunch platform offers a hybrid cloud/on premise solution that provides a simple solution to these challenges.
- Full Power of Python
The platform is based on Apache Airflow, the open source industry standard for Python workflow orchestration. Airflow combines "off-the-shelf" solutions for standard tasks with the full flexibility of the Python programming language. - No Ops Hybrid Cloud Offering
Airlaunch Managed Airflow provides a scalable Airflow instance on a pay-per-use model in the cloud, keeping costs low. - Secure integration of on-premise data
The Airflow Local Worker ensures access to on-premise data without compromising network security. It connects to our cloud platform using an encrypted connection. The cloud platform monitors and controls the worker, minimizing operational costs. Communication always originates from the local worker, never the other way around (outgoing only connection), so there is no need to open the local network from the outside.
Process Overview
From a data analysis perspective, the process can be divided into four steps. Each of these steps generates data that is to be stored centrally in a data warehouse. In addition, human operators record incidents and faults that have an influence on the process.

- In the laboratory, the concentration of iron ore and impurities in the starting material is measured every hour.
- A sensor continuously measures the material flow into the flotation cells.
- Human operators record incidents and faults, which are stored across all process steps.
- The control system of the seven flotation cells continuously monitors the water level as well as the air flow.
- Hourly laboratory measurements determine the concentration of iron ore and impurities in the concentrate produced.
Data Architecture
In order to obtain an overall view of this data, it must be stored centrally in a data warehouse and visualized using a business intelligence tool.
To achieve this, we use the following infrastructure:
- Airlaunch Managed Apache Airflow to orchestrate the ETL pipeline.
- Airlaunch Local Worker to load the on-premise data and write it to the data warehouse.
- A data warehouse for centralized storage of data. Apache Airflow supports all popular databases and warehouse solutions "out of the box". In our case we write the data to a simple Postgres database.
- Business Intelligence software of choice for data visualization. In this example, we used the open source solution Apache Superset.

Implementation
Step 1: Defining the ETL Pipeline
The diagram shows that we need to use three different technologies for data extraction:
- The laboratory data as well as the incident reports are available as Excel files and have to be retrieved via FTP server.
- The sensor data is written to a database by an external system and can be read from there.
- The flotation cells use OPC UA, as do many production machines.
Extracting OPC UA data
OPC UA is widely used in the industrial environment. For this reason, Airlaunch has written an OPC UA operator for Airflow that facilitates the extraction of OPC UA data, so you don't need to write a separate function for it.
The operator performs the following tasks:
- Load the history of the OPC UA node of a flotation cell in the relevant time period.
- Add the name/identifier of the flotation cell to the table.
- Writing the data to the data warehouse.
Extracting Excel data
To read the data from the Excel files, we wrote a small Python function. It automaties the following tasks:
- Downloading the Excel file from the FTP server
- Reading the Excel file into a Pandas dataframe
- Filtering for the relevant timestamps
- Renaming the columns of the spreadsheet to the format we require for the datawarehouse
- Writing the data to the data warehouse
Since the function is generic, it can be used for all the excel files that we need to retreive from the FTP server.
def extract_xlsx_from_ftp(postgres_conn_id: str, postgres_table: str, ftp_conn_id: str, remote_full_path: str, sheet_name: str, column_names: list, dtype: dict, start_time: str, end_time: str):
ftp_hook = FTPHook(
ftp_conn_id
)
buff = BytesIO()
ftp_hook.retrieve_file(
remote_full_path=remote_full_path,
local_full_path_or_buffer=buff
)
excel_object = pd.ExcelFile(buff, engine='openpyxl')
df = excel_object.parse(sheet_name = sheet_name, index_col = 0)
df = df[(df.index >= start_time) & (df.index < end_time)]
df.reset_index(drop=False, inplace=True)
df.columns = column_names
pg_hook = PostgresHook(postgres_conn_id=postgres_conn_id)
df.to_sql(postgres_table, pg_hook.get_sqlalchemy_engine(), if_exists='append', index=False, dtype=dtype)
Extracting Postgres Data
To extract the sensor data from the Postgres database, we take a similar approach. Leveraging the Airflow postgres hook, we create a small Python function that reads the data from the data source and writes it to our data warehouse:
def extract_postgres_table(source_postgres_conn_id: str, source_table: str, dest_postgres_conn_id: str, dest_table: str, start_time: str, end_time: str):
source_hook = PostgresHook(source_postgres_conn_id)
destination_hook = PostgresHook(dest_postgres_conn_id)
records = source_hook.get_pandas_df("""SELECT * FROM "{}" WHERE datetime >= '{}' AND datetime < '{}'""".format(source_table, start_time, end_time))
records.to_sql(dest_table, destination_hook.get_sqlalchemy_engine(), if_exists='append', index=False)
Step 2: Automating data extraction
To automate the steps described above, we configure an Airflow DAG that ensures that the tasks are executed regularly and that the correct time period of the data is read out each time.
We create the DAG in Airflow by simply instantiating a DAG object.
with DAG(
'opc',
default_args=default_args,
description='Extract Production Data',
schedule_interval=timedelta(minutes=15),
start_date=datetime(2021, 12, 11, 0, 0, 0),
catchup=True,
) as dag:
# Defintion of the tasks
We have defined here that the ETL pipeline is executed every 15 minutes. This way we always have the latest data in our data warehouse. With 'catchup=True' we make sure that a run is run later on if a time interval is missed. This way we make sure that we don't have any gaps in our warehouse.
Automating the Python functions
We can automate the Python functions (for extracting the Postgres data and for reading the Excel files) using the Airflow Python operator.
We simpply pass the Python function and a list of arguments, with which the function should be executed, to the Python operator.
fetchInputQuality = PythonOperator(
task_id='extract_input_quality',
python_callable=extract_xlsx_from_ftp
queue='local',
op_kwargs={
"postgres_table": "input_quality",
"column_names": ['datetime', 'percent_iron_feed', 'percent_silica_feed'],
"dtype": {"datetime": DateTime, "percent_iron_feed": Float, "percent_silica_feed": Float},
"ftp_conn_id": "ftp_lab",
"remote_full_path": "/data/input_quality_shifted.xlsx",
"sheet_name": 'Sheet1',
"start_time": "{{ data_interval_start.to_datetime_string() }}",
"end_time": "{{ data_interval_end.to_datetime_string() }}",
"postgres_conn_id": "postgres_warehouse"
}
)
The arguments define which Excel file is to be read in and how it is to be written to the Postgres database.
Note the argument "queue=local". This defines that the task should be executed on-premise by the local worker. The allocation and monitoring of this local task is done by Airflow in the cloud.
Further, note the arguments 'start_time' and 'end_time'. Here we use Airflow macros, which are replaced at runtime with the correct timewindow. This ensures that the correct time interval is always read in and that we do not transfer duplicate data. So we don't have to care in the function itself how big the time interval is or when exactly the function is executed. Airflow takes care of that for us, depending on the execution time and the schdule interval.
Automating OPC UA Data Extraction
To automate the OPC UA data extraction we use the OPCUAToPostgresOperator by Airlaunch. Since we are reading data from seven flotation cells, we create the tasks in a simple loop to avoid having to copy paste the task seven times:
for column in floationColumns:
extractColumnLevel = OPCUAToPostgresOperator(
task_id='extract_opc_column_level_{}'.format(column["colNo"]),
queue='local',
opcua_node="ns=0;i={}".format(column["levelNodeId"]),
opcua_conn_id="opc",
if_exists='append',
opcua_startdate="{{ data_interval_start.to_datetime_string() }}",
opcua_enddate="{{ data_interval_end.to_datetime_string() }}",
postgres_table='column_level',
static_columns={
'column': column["colNo"]
},
postgres_conn_id="postgres_warehouse",
)
The loop defines a task for each cell that reads the water level in the relevant time period from the OPC UA server and writes it to the data warehouse. Again we use the Airflow macros to define the time period (opcua_startdate and opcua_enddate) and tell Airflow to run this task on-premise (queue='local').
That's it! With just a few lines of code we have created a highly flexible ETL pipeline that regularly and automatically loads data into our data warehouse!
Step 3: Visualizing the data
With the help of a business intelligence tool, we can now reap the rewards and create a central overview of our process data. Airlaunch is built on open source, so we use Apache Superset. However, you can use a tool of your choice.
After creating a dashboard, we can now see information from all the different data sources along the production process at a glance and can centrally display all the important information, such as production volume and product quality. This now allows a central overview of the most important process KPIs, which are continuously and automatically kept up to date.

Another strength comes from combining different data sources. In this case, we can link incidents with production volumes and thus see directly how incidents affect production. This information can then be used to take the right measures based on the data.

These visualizations show what can already be achieved with simple means. With data warehousing, much has already been achieved on the way to data-based decision making. Such a data platform now opens up a whole field of possibilities for further data use, such as the use of artificial intelligence, for example for predictive maintenance, automated outlier detection or the prediction of expected product quality.
Next Steps
We are happy to support you on your journey to using your data continuously and consistently. Contact us for an initial consultation or a free data assessment.