Using Kubernetes executors

Instead of using the Celery workers you can let Airflow run the tasks using Kubernetes executors, where Pods are created dynamically as needed without jobs being routed through a Redis queue to the workers.

Kubernetes Executor configuration

To achieve this, swap spec.celeryExecutors with spec.kubernetesExecutors. E.g. you would change the following example

spec:
  celeryExecutors:
    roleGroups:
      default:
        replicas: 2
    config:
      resources:
        # ...

to

spec:
  kubernetesExecutors:
    config:
      resources:
        # ...

Logging

Kubernetes Executors and their respective Pods only live as long as the task they are executing. Afterwards the Pod is immediately terminated and e.g. console output or logs are gone.

In order to persist task logs, Airflow can be configured to store its executor logs on disk (PV) or as described in the following section on S3.

Airflow Web UI

In the Airflow Web UI, click on AdminConnectionsAdd a new record (the plus). Then enter your MinIO host and credentials as shown.

Airflow connection menu

The name or connection ID is minio, the type is Amazon Web Services, the AWS Access Key ID and AWS Secret Access Key are filled with the S3 credentials. The Extra field contains the endpoint URL like:

{
  "endpoint_url": "http://minio.default.svc.cluster.local:9000"
}

Executor configuration

In order to configure the S3 logging, you need to add the following environment variables to the Airflow cluster definition:

apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 2.9.3
  clusterConfig: {}
  webservers:
    envOverrides: &s3-logging-env-overrides
      AIRFLOW_LOGGING_REMOTE_LOGGING: "True"
      AIRFLOW_LOGGING_REMOTE_BASE_LOG_FOLDER: s3://<bucket-name>/airflow-task-logs/
      # The name / connection ID created in the Airflow Web UI
      AIRFLOW_LOGGING_REMOTE_LOG_CONN_ID: minio
    roleGroups:
      default:
        replicas: 1
  schedulers:
    envOverrides: *s3-logging-env-overrides
    roleGroups:
      default:
        replicas: 1
  kubernetesExecutors:
    envOverrides: *s3-logging-env-overrides

Now you should be able to fetch and inspect logs in the Airflow Web UI from S3 for each DAG run.

Airflow DAG S3 logs