airflow-scheduled-job

Install this demo on an existing Kubernetes cluster:

$ stackablectl demo install airflow-scheduled-job

You can also deploy it to a specific namespace (the namespace airflow-demo will be assumed in this guide):

$ stackablectl demo install airflow-scheduled-job -n airflow-demo

This demo should not be run alongside other demos.

System requirements

To run this demo, your system needs at least:

  • 2.5 cpu units (core/hyperthread)

  • 10GiB memory

  • 24GiB disk storage

Overview

This demo will

  • Install the required Stackable operators

  • Spin up the following data products

    • Postgresql: An open-source database used for Airflow cluster and job metadata

    • Airflow: An open-source workflow management platform for data engineering pipelines

    • Kafka: An open-source messaging broker that will be used to trigger an Airflow DAG

    • Open Policy Agent: An open-source policy engine used for user authorization

    • Trino: A fast distributed SQL query engine for big data analytics that helps you explore your data universe. This demo uses it to enable SQL access to the data

    • MinIO: A S3 compatible object store. This demo uses it as persistent storage to store the Trino data and Airflow logs

  • Mount several Airflow jobs (referred to as Directed Acyclic Graphs, or DAGs) for the cluster to use

  • Enable and schedule the jobs

  • Verify the job status with the Airflow Webserver UI

  • Illustrate DAG event-based scheduling

  • Illustrate user authorization

You can see the deployed products and their relationship in the following diagram:

overview

List deployed Stackable services

To list the installed Stackable services run the following command:

$ stackablectl stacklet list

┌─────────┬───────────────┬───────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────┐
│ PRODUCT ┆ NAME          ┆ NAMESPACE ┆ ENDPOINTS                                                                                                 ┆ CONDITIONS                      │
╞═════════╪═══════════════╪═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════╪═════════════════════════════════╡
│ airflow ┆ airflow       ┆ default   ┆ webserver-http                              http://172.19.0.3:31483                                       ┆ Available, Reconciling, Running │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hive    ┆ hive-iceberg  ┆ default   ┆ metastore-hive                              hive-iceberg-metastore.default.svc.cluster.local:9083         ┆ Available, Reconciling, Running │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ kafka   ┆ kafka         ┆ default   ┆ broker-default-0-listener-broker-kafka-tls                                                                ┆ Available, Reconciling, Running │
│         ┆               ┆           ┆ kafka-broker-default-0-listener-broker.default.svc.cluster.local:9093                                     ┆                                 │
│         ┆               ┆           ┆ broker-default-0-listener-broker-metrics                                                                  ┆                                 │
│         ┆               ┆           ┆ kafka-broker-default-0-listener-broker.default.svc.cluster.local:9606                                     ┆                                 │
│         ┆               ┆           ┆ broker-default-bootstrap-kafka-tls          kafka-broker-default-bootstrap.default.svc.cluster.local:9093 ┆                                 │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ opa     ┆ opa           ┆ default   ┆                                                                                                           ┆ Available, Reconciling, Running │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ trino   ┆ trino         ┆ default   ┆ coordinator-https                           https://172.19.0.5:31087                                      ┆ Available, Reconciling, Running │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ minio   ┆ minio-console ┆ default   ┆ https                                       https://172.19.0.4:31792                                      ┆                                 │
└─────────┴───────────────┴───────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────────────────┘

When a product instance has not finished starting yet, the service will have no endpoint. Depending on your internet connectivity, creating all the product instances might take considerable time. A warning might be shown if the product is not ready yet.

Airflow Webserver UI

Open the airflow endpoint webserver-airflow in your browser (http://172.19.0.3:31483 in this case).

airflow 1

Log in with the username admin and password adminadmin. Click on 'DAGs' in the left margin and you will see an overview showing the DAGs mounted during the demo setup (date_demo and sparkapp_dag).

airflow 2

There are two things to notice here. All DAGs but one (the one demonstrating a deferrable operator) have been enabled, as shown by the slider on the far right of the screen for each DAG (DAGs are all paused initially and can be activated manually in the UI or via a REST call, as done in the setup for this demo). Secondly, the date_demo job has been busy, with several runs already logged:

airflow 3

The sparkapp_dag has only been run once because they have been defined with different schedules. The kafka_watcher job has been activated and is awaiting a trigger action (in this case, a message arriving in a designated Kafka topic). Clicking on the DAG name and then on Runs will display the individual job runs:

airflow 5

The demo_date job is running every minute. With Airflow, DAGs can be started manually or scheduled to run when certain conditions are fulfilled - in this case, the DAG has been set up to run using a cron table, which is part of the DAG definition.

demo_date DAG

Let’s drill down a bit deeper into this DAG. At the top under the DAG name there is some scheduling information, which tells us that this job will run every minute continuously:

airflow 6

Click on one of the job runs in the list to display the details for the task instances. In the left-side pane the DAG is displayed either as a graph (this job is so simple that it only has one step, called run_every_minute), or as a "bar chart" showing each run.

airflow 7

The first couple of runs of this DAG may result in the tasks not being ready and being skipped, resulting in an error in the UI. This is a side-effect of the scheduling mechanism within Airflow: for example, due to the rapid succession of task runs triggered by frequent scheduling. These errors should not appear on later task runs.

Click on the run_every_minute box in the centre of the page to select the logs:

In this demo, the KubernetesExecutor is deployed which means that logs are only preserved (and available in the UI) if either remote logging or the SDP logging framework is configured. In this demo we set up remote logging using S3/Minio. Since Minio in this case is set up with TLS, the Airflow connection requires that the webserver has access to a relevant certificate and that every pod has environment variables containing the access and secret keys. See the Airflow Documentation for more details.

If you are interested in persisting the logs using the SDP logging framework, take a look at the logging demo.

airflow 8

To look at the actual DAG code click on Code. Here we can see the crontab information used to schedule the job as well the bash command that provides the output:

airflow 9

sparkapp_dag DAG

Go back to DAG overview screen. The sparkapp_dag job has a scheduled entry of None and a last-execution time. This allows a DAG to be executed exactly once, with neither schedule-based runs nor any backfill. The DAG can always be triggered manually again via REST or from within the Webserver UI.

airflow 10

By navigating to the graphical overview of the job we can see that DAG has two steps, one to start the job - which runs asynchronously - and another to poll the running job to report on its status.

airflow 11

kafka_watcher DAG

This DAG is using the event-scheduling feature of Airflow. Click on the DAG and then select the Code tab in the window on the right half of the screen. When a message arrives in the topic named test-topic, the DAG will be triggered. Note that the connection to Kafka (kafka_conn) is expected: this has been created as part of the airflow cluster:

airflow 12

We can use the kafka-producer script bundled with Kafka to write to this topic (note the namespace we chose initially is used consistently in this demo):

kubectl exec -n airflow-demo kafka-broker-default-0 -c kafka -- bash -c \
'echo "Hello World at: $(date)" | /stackable/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server $BOOTSTRAP_SERVER \
  --topic test-topic \
  --producer.config /stackable/config/client.properties'

The triggerer logs will show that this DAG was fired (logging out the message that we wrote to the topic above). You can do this by either displaying the pod logs directly (e.g. if you are using k9s) or by issuing:

kubectl logs -n airflow-demo airflow-triggerer-default-0 --tail=30

The logs show that our message was detected, triggering the job:

airflow 13

The DAG view will show that run was successful. Note the kafka_queue_asset under Schedule. This is an Airflow object (also defined in the DAG code) that wraps the actual trigger/wait mechanism in an generic way for use in DAG code.

airflow 14

Clicking on the asset will show the triggers that have been fired, called Asset Events:

airflow 15

core_deferrable_sleep_demo DAG

Now log out of the UI. This next section will illustrate user authorization with the Open Policy Agent (OPA) and the OPA Authorizer that is included in Stackable’s Airflow product images. Two other users - other than admin - are supplied with this demo: jane.doe and richard.roe (the passwords are the same as the user names). jane.doe has permission to view all DAGs but only has permission to activate and run core_deferrable_sleep_demo. richard.roe does not have permission to view or action any DAGs. Login in as jane.doe. Select the DAGs view, making sure to set the filter from Enabled to All. You will see the list of 4 DAGs, and can enable core_deferrable_sleep_demo (the only DAG not automatically enabled when the demo was deployed). But if you try and run any other DAG, permission will be denied:

airflow 16

Enable and run core_deferrable_sleep_demo:

airflow 17

Click on the DAG, switching to the task view. This DAG uses a deferrable operator which, in conjunction with the triggerer process, "offloads" the DAG from its worker for a specfic period of time, before being picked up and again and executed. You will see the task cycle through the following states:

deferrable 01 running
deferrable 02 queued
deferrable 03 deferred
deferrable 04 queued
deferrable 05 running
deferrable 06 success

Now log out and log in again as richard.roe. On the home screen no DAGs are visible, as expected by the authorization rules defined for this user:

opa 01

run_dbt DAG

Log back into the UI as admin. Select the DAGs view, making sure the filter is set to All. Trigger a run on the run_dbt DAG (clicking on the trigger button will automatically activate a disabled DAG). Switch to the task view and click on the dbt-test under Task ID. It takes a few moments before the logs appear: this is because Airflow is configured to use the KubernetesExecutor - which creates new Pods for each DAG task - and as the DAG itself uses the KubernetesPodOperator, this means that another Pod is spawned. Both of these are terminated on DAG completion. The pattern is probably overkill for simple scenarios, but we use it in this demo to show that logs from the final task are written to and retrievable from the S3 location defined in the Airflow cluster.

airflow 18

If you are tracking the logs in the UI from the task as it is running, you may need to fresh the screen on task completion to see the final logs. This is because they are written to the running Pod while the task is running, but are then written to S3 upon completion.

If you switch to the Code tab you will see the following:

  run_dbt = KubernetesPodOperator(
      image="oci.stackable.tech/demos/dbt-demo:0.0.1",
      image_pull_policy="IfNotPresent",
      cmds=["/bin/bash", "-x", "-euo", "pipefail", "-c"],
      arguments=["cd /dbt/dbt_test && export DBT_PROFILES_DIR=/dbt/dbt_test && dbt debug && dbt run && dbt test"],
      ...
      )

The task checks the configuration, runs a task that inserts some dummy data into a table, and then runs some tests to verify the result. The details of the simple DBT project can be found in the demos repository.

Patching Airflow to stress-test DAG parsing using relevant environment variables

Make sure you are still logged in as admin. The demo also created a third DAG in the ConfigMap, called dag_factory.py, which was not mounted to the cluster and therefore does not appear in the UI. This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG parsing). To show these individual DAGs in the overall list (and to remove the existing ones), adjust the volumeMounts as shown below. The patch also sets some environment variables that can be used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html.

---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  clusterConfig:
    volumeMounts:
      - name: airflow-dags
        mountPath: /dags/dag_factory.py
        subPath: dag_factory.py
      - name: kafka-tls-pem
        mountPath: /stackable/kafka-tls-pem
  webservers:
    roleGroups:
      default:
        envOverrides: &envOverrides
          AIRFLOW__CORE__DAGS_FOLDER: "/dags"
          PYTHONPATH: "/stackable/app/log_config:/dags"
          AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60"
          AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60"
          AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60"
          AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60"
          AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
  kubernetesExecutors:
    envOverrides: *envOverrides
  schedulers:
    roleGroups:
      default:
        envOverrides: *envOverrides

THe patch can be applied like this:

kubectl patch airflowcluster airflow --type="merge" --patch-file stacks/airflow/patch_airflow.yaml -n airflow-demo

The scheduled job runs every minute and so an instance of it may be running while the scheduler is being re-started as a result of the patch, in which case that instance may fail.

Summary

This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI.