Mounting DAGs

DAGs can be mounted by using a ConfigMap or git-sync. This is best illustrated with an example of each, shown in the sections below.

via ConfigMap

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: cm-dag (1)
data:
  test_airflow_dag.py: | (2)
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.dummy import DummyOperator

    with DAG(
        dag_id='test_airflow_dag',
        schedule_interval='0 0 * * *',
        start_date=datetime(2021, 1, 1),
        catchup=False,
        dagrun_timeout=timedelta(minutes=60),
        tags=['example', 'example2'],
        params={"example_key": "example_value"},
    ) as dag:
        run_this_last = DummyOperator(
            task_id='run_this_last',
        )

        # [START howto_operator_bash]
        run_this = BashOperator(
            task_id='run_after_loop',
            bash_command='echo 1',
        )
        # [END howto_operator_bash]

        run_this >> run_this_last

        for i in range(3):
            task = BashOperator(
                task_id='runme_' + str(i),
                bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
            )
            task >> run_this

        # [START howto_operator_bash_template]
        also_run_this = BashOperator(
            task_id='also_run_this',
            bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
        )
        # [END howto_operator_bash_template]
        also_run_this >> run_this_last

    # [START howto_operator_bash_skip]
    this_will_skip = BashOperator(
        task_id='this_will_skip',
        bash_command='echo "hello world"; exit 99;',
        dag=dag,
    )
    # [END howto_operator_bash_skip]
    this_will_skip >> run_this_last

    if __name__ == "__main__":
        dag.cli()
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 2.9.2
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: simple-airflow-credentials
    volumes:
      - name: cm-dag (3)
        configMap:
          name: cm-dag (4)
    volumeMounts:
      - name: cm-dag (5)
        mountPath: /dags/test_airflow_dag.py (6)
        subPath: test_airflow_dag.py (7)
  webservers:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 1
  celeryExecutors:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 2
  schedulers:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 1
1 The name of the configuration map
2 The name of the DAG (this is a renamed copy of the example_bash_operator.py from the Airflow examples)
3 The volume backed by the configuration map
4 The name of the configuration map referenced by the Airflow cluster
5 The name of the mounted volume
6 The path of the mounted resource. Note that should map to a single DAG.
7 The resource has to be defined using subPath: this is to prevent the versioning of configuration map elements which may cause a conflict with how Airflow propagates DAGs between its components.
8 If the mount path described above is anything other than the standard location (the default is $AIRFLOW_HOME/dags), then the location should be defined using the relevant environment variable.
If a DAG mounted via ConfigMap consists of modularized files then using the standard location is mandatory as python will use this as a "root" folder when looking for referenced files.

The advantage of this approach is that a DAG can be provided "in-line", as it were. This becomes cumbersome when multiple DAGs are to be made available in this way, as each one has to be mapped individually. For multiple DAGs it is probably easier to expose them all via a mounted volume, which is shown below.

via git-sync

Overview

git-sync is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes. The Stackable implementation is a wrapper around this such that the binary and image requirements are included in the Stackable Airflow product images and do not need to be specified or handled in the AirflowCluster custom resource. Internal details such as image names and volume mounts are handled by the operator, so that only the repository and synchronization details are required. An example of this usage is given in the next section.

Example

---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: "2.9.2"
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: test-airflow-credentials (1)
    dagsGitSync: (2)
      - repo: https://github.com/stackabletech/airflow-operator (3)
        branch: "main" (4)
        gitFolder: "tests/templates/kuttl/mount-dags-gitsync/dags" (5)
        depth: 10 (6)
        wait: 20 (7)
        credentialsSecret: git-credentials (8)
        gitSyncConf: (9)
          --rev: HEAD (10)
          # --rev: git-sync-tag # N.B. tag must be covered by "depth" (the number of commits to clone)
          # --rev: 39ee3598bd9946a1d958a448c9f7d3774d7a8043 # N.B. commit must be covered by "depth"
          --git-config: http.sslCAInfo:/tmp/ca-cert/ca.crt (11)
  webservers:
    ...
1 A Secret used for accessing database and admin user details (included here to illustrate where different credential secrets are defined)
2 The git-gync configuration block that contains list of git-sync elements
3 The repository that will be cloned (required)
4 The branch name (defaults to main)
5 The location of the DAG folder, relative to the synced repository root (required)
6 The depth of syncing i.e. the number of commits to clone (defaults to 1)
7 The synchronisation interval in seconds (defaults to 20 seconds)
8 The name of the Secret used to access the repository if it is not public. This should include two fields: user and password (which can be either a password - which is not recommended - or a github token, as described here)
9 A map of optional configuration settings that are listed in this configuration section (and the ones that follow on that link)
10 An example showing how to specify a target revision (the default is HEAD). The revision can also a be tag or a commit, though this assumes that the target hash is contained within the number of commits specified by depth. If a tag or commit hash is specified, then git-sync will recognise that and not perform further cloning.
11 Git-sync settings can be provided inline, although some of these (--dest, --root) are specified internally in the operator and will be ignored if provided by the user. Git-config settings can also be specified, although a warning will be logged if safe.directory is specified as this is defined internally, and should not be defined by the user.
The example above shows a *list* of git-sync definitions, with a single element. This is to avoid breaking-changes in future releases. Currently, only one such git-sync definition is considered and processed.
git-sync can be used with DAGs that make use of Python modules, as Python will be configured to use the git-sync target folder as the "root" location when looking for referenced files. See the Applying Custom Resources example for more details.