Using Kubernetes executors

Instead of using the Celery workers you can let Airflow run the tasks using Kubernetes executors, where Pods are created dynamically as needed without jobs being routed through a Redis queue to the workers.

Kubernetes Executor configuration

To achieve this, swap spec.celeryExecutors with spec.kubernetesExecutors. E.g. you would change the following example

spec:
  celeryExecutors:
    roleGroups:
      default:
        replicas: 2
    config:
      resources:
        # ...

to

spec:
  kubernetesExecutors:
    config:
      resources:
        # ...

Logging

Kubernetes Executors and their respective Pods only live as long as the task they are executing. Afterwards the Pod is immediately terminated and e.g. console output or logs are gone.

In order to persist task logs, Airflow can be configured to store its executor logs on disk (PV) or as described in the following section on S3.

Add S3 connection in Airflow Web UI

In the Airflow Web UI, click on AdminConnectionsAdd a new record (the plus). Then enter your MinIO host and credentials as shown.

Airflow connection menu

The name or connection ID is minio, the type is Amazon Web Services, the AWS Access Key ID and AWS Secret Access Key are filled with the S3 credentials. The Extra field contains the endpoint URL like:

{
  "endpoint_url": "http://minio.default.svc.cluster.local:9000"
}

Executor configuration

In order to configure the S3 logging, you need to add the following environment variables to the Airflow cluster definition:

apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 2.9.3
  clusterConfig: {}
  webservers:
    envOverrides: &envOverrides
      AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
      AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://<bucket-name>/airflow-task-logs/
      # The name of the S3 connection created in the Airflow Web UI
      AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: minio
    roleGroups:
      default:
        replicas: 1
  schedulers:
    envOverrides: *envOverrides
    roleGroups:
      default:
        replicas: 1
  kubernetesExecutors:
    envOverrides: *envOverrides

Now you should be able to fetch and inspect logs in the Airflow Web UI from S3 for each DAG run.

Airflow DAG S3 logs

Store xComs objects in object storage (e.g. S3)

By default Airflow stores the xCom (cross-communications) objects in the Airflow database (often times PostgreSQL), which can cause the database to run out of disk space (especially if you have high xCom traffic). To prevent this, you can configure Airflow to do the xCom exchange via an object store backend (such as S3) and not the database. You can read on details in the official documentation.

Airflow stores the xCom objects in the Postgres table xcom (e.g. airflow.public.xcom). You can have a look at that table to see what exactly is getting stored.

To e.g. determine the biggest xCom objects you can use

select
	length(value) as size,
	dag_run_id,
	task_id,
	map_index,
	key,
	dag_id,
	run_id,
	"timestamp"
from xcom
order by length(value) desc limit 20;

Being careful you can also clean up the table to save space in the database. In our testing, running VACUUM FULL was enough for the changes to take effect on the PostgreSQL disk.

Add S3 connection in Airflow Web UI

Please add an S3 connection in the Admin WebUI according to Add S3 connection in Airflow Web UI

Executor configuration

In order to configure the S3 xCom backend, you need to add the following environment variables to the Airflow cluster definition:

apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 2.9.3
  clusterConfig: {}
  webservers:
    envOverrides: &envOverrides
      AIRFLOW__CORE__XCOM_BACKEND: airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
      # The connection id is obtained from the user part of the url that you will provide
      AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH: s3://minio@<bucket-name>/airflow-xcom
      # Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be put in object storage
      AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD: "1024" # 1KiB
      AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_COMPRESSION: gzip
    roleGroups:
      default:
        replicas: 1
  schedulers:
    envOverrides: *envOverrides
    roleGroups:
      default:
        replicas: 1
  kubernetesExecutors:
    envOverrides: *envOverrides