S3-sync sidecar to continuously deploy DAGs for Airflow running on Kubernetes

Mai Nguyen
7 min readJul 19, 2023

--

Summary

The article explores common ways to deploy Airflow DAGs in Kubernetes. It starts by discussing three approaches: embedding DAGs into the Airflow base container image, using Persistent Volume Claims (PVCs) to share DAG definitions, and utilizing Git-Sync or S3-Sync to synchronize DAG files. The focus then shifts to deploying Airflow DAGs with S3 in Kubernetes using the User-Community Airflow Helm Chart. This article is a guide on how to push DAG files to S3 and set up S3-Sync to synchronize DAG files from S3 to Airflow pods securely. It demonstrates how to define S3-Sync containers in the Helm templates and utilize them in various Airflow components, such as web servers and workers.

Common ways to deploy Airflow DAGs in Kubernetes

According to Airflow Official website, Airflow loads DAGs from Python source files, which it looks for inside its configured DAG folder. It will take each file, execute it, and then load any DAG objects from that file. It means that all Airflow components (including but not limited to webserver, scheduler, and worker) needs to be on a common filesystem that the components can read from. When running Airflow on Kubernetes, there are three common ways to load DAGs:

DAGs are baked / embedded into the Airflow base container image

This method requires all airflow containers to restart after each update to your DAG files. All airflow Pods will run the same image, and have the latest DAG definitions. In production deployment, it is important to ensure to NEVER REUSE an image tag name. This ensures that whenever container image tag is updated, all airflow pods will restart and have the same DAGs. However, one possible shortcoming of this way is the long and time-consuming deployment time due to the graceful termination when deploying new versions in order not to create downtime for users.

Use a Persistent Volume Claim to share your DAG definitions across the Airflow Pods

a PersistentVolumeClaim can be used to share the DAG definitions across the airflow Pods. The PVC must be either manually fill it with DAG files, or an external system is configured to automate this process. For example, a CI/CD pipeline may be created on the DAGs repo to update the volume as commits are made.

Use Git-Sync to synchronize DAG files

DAG definitions may be stored in a git repository and the Kubernetes deployment includes a git-sync sidecar that automatically syncs a local copy this repository into each airflow Pod at a regular interval.

Use S3-Sync to synchronize DAG files

Object storage synchronization is another deploying option. Airflow DAG codes are pushed to a cloud object storage and Airflow picks up the codes from there. S3 is a popular choice for AWS users. This article is a guide on how to deploy Airflow DAGs with S3 in Kubernetes using User-Community Airflow Helm Chart.

Push DAG files to S3

Before deploying Airflow, it is important to push DAG files to S3 first. A quick and simple way to integrate the AWS CLI command in the CI/CD pipeline:

aws s3 sync /path/airflow/dag/folder s3://path/airflow/dag/folder --delete

Sync DAG files from S3 to Airflow pods

This part shows how to set up s3-sync sidecar that copies or syncs Airflow DAG files from S3 to Airflow pods.

Add values for s3-sync option in Helm values file

First, let’s add essential values for s3-sync option in Helm values file. S3-sync sidecar uses AWS CLI Docker image to interact with S3 and run copy / sync command.

dags:
## the airflow dags folder##
path: /opt/airflow/dags
...
s3Sync:
## if the git-sync sidecar container is enabled
enable: true
## AWS CLI Docker Image
image:
repository: amazon/aws-cli
tag: latest
pullPolicy: Always
# Run as root user
uid: 65533
gid: 65533
# s3 bucket that contains DAG files
bucketName: airflow
# s3 key path to DAG files
key: dags
# sync interval in second
interval: 1803ew98aq7yt

Define s3 sync containers in the templates/_helpers/*.tpl

In a Helm chart, the templates/_helpers/*.tpl files or templates/_helpers.tps contains several methods and sub-template. These files are not rendered to Kubernetes object definitions but are available everywhere within other chart templates for use. S3 sync containers will be defined here and available for other Airflow components later.

In the context of this article, the code blocks for the s3 sync containers are defined in templates/_helpers/pods.tpl.

One-time synchronization container

The first template code block defines a container that performs a one-time synchronization of files from an S3 bucket to a specified destination path in a Pod.

{{/*
Define a container which syncs a s3 path one-time
EXAMPLE USAGE: {{ include "airflow.container.s3_initial_sync" (dict "Release" .Release "Values") }}
*/}}
{{- define "airflow.container.s3_initial_sync" }}
- name: dags-s3-initial-sync
image: {{ .Values.dags.s3Sync.image.repository }}:{{ .Values.dags.s3Sync.image.tag }}
imagePullPolicy: {{ .Values.dags.s3Sync.image.pullPolicy }}
resources:
{{- toYaml .Values.dags.s3Sync.resources | nindent 4 }}
envFrom:
{{- include "airflow.envFrom" . | indent 4 }}
env:
- name: AWS_BUCKET
value: {{ .Values.dags.s3Sync.bucketName | quote }}
- name: KEY_PATH
value: {{ .Values.dags.s3Sync.keyPath | quote }}
- name: DEST_PATH
value: {{ .Values.dags.workDir | quote}}
- name: AIRFLOW_USER_UID
value: {{ .Values.airflow.image.uid | quote }}
{{- /* this has user-defined variables, so must be included BELOW (so the ABOVE `env` take precedence) */ -}}
{{- include "airflow.env" . | indent 4 }}
command:
# Copy all files from s3 to the dags folder and grant permissions to the airflow user
- /bin/sh
- -c
- aws s3 cp s3://${AWS_BUCKET}/${KEY_PATH} ${DEST_PATH} --exclude "*.pyc" --recursive; chown -R ${AIRFLOW_USER_UID} ${DEST_PATH}
volumeMounts:
- name: dags-data
mountPath: {{ .Values.dags.workDir }}
{{- end }}

The container executes aws s3 cp command, which recursively copies the content of the S3 bucket to the destination path and grants permissions to the Airflow user by changing the ownership of the files and directories in the destination path using chown.

Regular synchronization container

{{/*
Define a container which regularly syncs a s3 path
EXAMPLE USAGE: {{ include "airflow.container.s3_sync" (dict "Release" .Release "Values") }}
*/}}
{{- define "airflow.container.s3_sync" }}
- name: dags-s3-sync
image: {{ .Values.dags.s3Sync.image.repository }}:{{ .Values.dags.s3Sync.image.tag }}
imagePullPolicy: {{ .Values.dags.s3Sync.image.pullPolicy }}
resources:
{{- toYaml .Values.dags.s3Sync.resources | nindent 4 }}
envFrom:
{{- include "airflow.envFrom" . | indent 4 }}
env:
- name: AWS_BUCKET
value: {{ .Values.dags.s3Sync.bucketName | quote }}
- name: KEY_PATH
value: {{ .Values.dags.s3Sync.keyPath | quote }}
- name: DEST_PATH
value: {{ .Values.dags.workDir | quote}}
- name: INTERVAL
value: {{ .Values.dags.s3Sync.interval | quote }}
- name: AIRFLOW_USER_UID
value: {{ .Values.airflow.image.uid | quote }}
{{- /* this has user-defined variables, so must be included BELOW (so the ABOVE `env` take precedence) */ -}}
{{- include "airflow.env" . | indent 4 }}
command:
# Sync all files from s3 to the dags folder, grant permissions to the airflow user and sleep
- /bin/sh
- -c
- >
while true; do
echo "---------------------------------------------------------------------------"
echo "Syncing s3://${AWS_BUCKET}/${KEY_PATH} to ${DEST_PATH}"
echo "aws s3 sync s3://${AWS_BUCKET}/${KEY_PATH} ${DEST_PATH} --exclude "*.pyc" --delete --exact-timestamps"
aws s3 sync s3://${AWS_BUCKET}/${KEY_PATH} ${DEST_PATH} --exclude "*.pyc" --delete --exact-timestamps
echo "Finish syncing s3://${AWS_BUCKET}/${KEY_PATH} to ${DEST_PATH}"
echo "Grant read access to ${DEST_PATH} for Airflow user ${AIRFLOW_USER_UID}"
echo "chown -R ${AIRFLOW_USER_UID} ${DEST_PATH}"
chown -R ${AIRFLOW_USER_UID} ${DEST_PATH}
echo "Sleep for ${INTERVAL} seconds"
echo "sleep ${INTERVAL}"
sleep ${INTERVAL};
done
volumeMounts:
- name: dags-data
mountPath: {{ .Values.dags.workDir }}
{{- end }}

The template code block defines a container that performs regular synchronization of files from an S3 bucket to a specified destination path in a Pod.

The container executes a multi-line script to continuously synchronize the content of the s3 path to a destination in the container filesystem at predefined interval. It is important to use the --delete option, which ensures that any files deleted in the S3 bucket are also deleted in the destination path., and the --exact-timestamps option, which synchronizes files only if the timestamps are different.

Check values for s3 sync containers

Additional checks on the values for s3 sync options is needed. In templates/_helpers/validate-values.tpl, add the code block:

{{/* Checks for `dags.s3Sync` */}}
{{- if .Values.dags.s3Sync.enabled }}
{{- if .Values.dags.persistence.enabled }}
{{ required "If `dags.s3Sync.enabled=true`, then `persistence.enabled` must be disabled!" nil }}
{{- end }}
{{- if .Values.dags.gitSync.enabled }}
{{ required "If `dags.s3Sync.enabled=true`, then `gitSync.enabled` must be disabled!" nil }}
{{- end }}
{{- if not .Values.dags.s3Sync.bucketName }}
{{ required "If `dags.s3Sync.enabled=true`, then `dags.s3Sync.bucketName` must be non-empty!" nil }}
{{- end }}
{{- if not .Values.dags.s3Sync.keyPath }}
{{ required "If `dags.s3Sync.enabled=true`, then `dags.s3Sync.keyPath` must be non-empty!" nil }}
{{- end }}
{{- if not .Values.dags.s3Sync.interval }}
{{ required "If `dags.s3Sync.enabled=true`, then `dags.s3Sync.interval` must be non-empty!" nil }}
{{- end }}
{{- end }}

The code performs a series of checks for the configuration options related to dags.s3Sync. It ensures that certain conditions are met when enabling S3 synchronization for DAGs in an Airflow deployment. In summary, if s3 sync is enabled, git sync and persistence feature for loading DAGs must be disabled, and values for s3 bucket name, key, and the sync interval must be non-empty.

Use s3 sync containers in other Airflow components

Let’s use the s3_initial_sync and s3_sync containers defined above in other Airflow components (Kubernetes job or deployment), including: db-migrations, flower, scheduler, sync, triggerer, webserver, worker.

The general principles are:

  1. Use s3_initial_sync container as one of the init containers, so that all Airflow plugins and Python packages are included in the container for any init jobs
  2. Use s3_sync container as a sidecar container to regularly sync Airflow DAGs from s3

Below is a code snippet for Airflow worker StatefulSet template:

apiVersion: apps/v1
## StatefulSet gives workers consistent DNS names, allowing webserver access to log files
kind: StatefulSet
metadata:
name: {{ include "airflow.fullname" . }}-worker
...
spec:
...
template:
metadata:
...
spec:
...
initContainers:
...
{{- if .Values.dags.s3Sync.enabled }}
{{- include "airflow.container.s3_initial_sync" (dict "Release" .Release "Values" .Values) | indent 8 }}
{{- end }}) | indent 8 }}
...
containers:
- name: airflow-worker
...
{{- if .Values.dags.s3Sync.enabled }}
{{- include "airflow.container.s3_sync" . | indent 8 }}
{{- end }}
...
{{- end }}

s3_initial_sync container will be added to the initContainers based on the value of the dags.s3Sync.enabled parameter. Similarly, s3_sync sidecar container will be included in the deployment if s3 sync is enabled.

References

https://medium.com/@tomaspm/airflow-dag-deployment-with-s3-2536dc347d2d

https://github.com/airflow-helm/charts/blob/main/charts/airflow/docs/faq/dags/load-dag-definitions.md#option-1---git-sync-sidecar

https://devopscube.com/create-helm-chart/

--

--