From be1f8f2d128e181a3a50ef4118e50062bc16c26a Mon Sep 17 00:00:00 2001 From: Seonghwna Lee Date: Sun, 25 Jan 2026 23:24:56 +0900 Subject: [PATCH 1/2] feat: Implement deployPerBundle feature for DAG processor deployments --- chart/templates/_helpers.yaml | 2 +- .../dag-processor-deployment.yaml | 219 ++++++---- .../dag-processor-poddisruptionbudget.yaml | 85 +++- chart/values.schema.json | 42 ++ chart/values.yaml | 33 ++ .../test_dag_processor_per_bundle.py | 375 ++++++++++++++++++ 6 files changed, 668 insertions(+), 88 deletions(-) create mode 100644 helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml index ceb0041a7b72d..02bec647cf3c6 100644 --- a/chart/templates/_helpers.yaml +++ b/chart/templates/_helpers.yaml @@ -1053,7 +1053,7 @@ backwards compatibility. Otherwise, fall back to createUserJob.enabled. {{- end -}} {{/* -Convert dagBundleConfigList YAML list to JSON string for dag_bundle_config_list. +Convert `dagBundleConfigList` YAML list to JSON string for `dag_bundle_config_list`. This helper function converts the structured YAML format to the JSON string format required by Airflow's dag_bundle_config_list configuration. diff --git a/chart/templates/dag-processor/dag-processor-deployment.yaml b/chart/templates/dag-processor/dag-processor-deployment.yaml index 448682224a003..93dd08f822a83 100644 --- a/chart/templates/dag-processor/dag-processor-deployment.yaml +++ b/chart/templates/dag-processor/dag-processor-deployment.yaml @@ -17,42 +17,62 @@ under the License. */}} -################################ -## Airflow Dag Processor Deployment -################################# -{{- $enabled := .Values.dagProcessor.enabled }} -{{- if eq $enabled nil}} - {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{/* +Helper template for dag processor deployment +Expects context with: + - .Values.dagProcessor: dag processor configuration (may be merged with bundle overrides) + - .bundleName: bundle name (empty string if not per-bundle mode) + - .deployPerBundle: boolean indicating if per-bundle mode is enabled +*/}} +{{- define "dag-processor.deployment" }} +{{- $bundleName := .bundleName | default "" }} +{{- $deployPerBundle := .deployPerBundle | default false }} +{{- $mergedConfig := .Values.dagProcessor }} +{{- $nodeSelector := or $mergedConfig.nodeSelector .Values.nodeSelector }} +{{- $affinity := or $mergedConfig.affinity .Values.affinity }} +{{- $tolerations := or $mergedConfig.tolerations .Values.tolerations }} +{{- $topologySpreadConstraints := or $mergedConfig.topologySpreadConstraints .Values.topologySpreadConstraints }} +{{- $revisionHistoryLimit := include "airflow.revisionHistoryLimit" (list $mergedConfig.revisionHistoryLimit .Values.revisionHistoryLimit) }} +{{- $securityContext := include "airflowPodSecurityContext" (list $mergedConfig .Values) }} +{{- $containerSecurityContext := include "containerSecurityContext" (list $mergedConfig .Values) }} +{{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list $mergedConfig.logGroomerSidecar .Values) }} +{{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list $mergedConfig.waitForMigrations .Values) }} +{{- $containerLifecycleHooks := or $mergedConfig.containerLifecycleHooks .Values.containerLifecycleHooks }} +{{- /* Prepare args: use per-bundle args if enabled, otherwise use default args */}} +{{- $args := $mergedConfig.args }} +{{- if and $deployPerBundle .Values.dagProcessor.deployPerBundle.args }} + {{- $args = list }} + {{- range .Values.dagProcessor.deployPerBundle.args }} + {{- $arg := . | toString | replace "{{ bundleName }}" $bundleName }} + {{- $args = append $args $arg }} + {{- end }} +{{- end }} +{{- /* Determine deployment name suffix */}} +{{- $nameSuffix := "" }} +{{- if $deployPerBundle }} + {{- $nameSuffix = printf "-%s" $bundleName }} {{- end }} -{{- if $enabled }} -{{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }} -{{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }} -{{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }} -{{- $topologySpreadConstraints := or .Values.dagProcessor.topologySpreadConstraints .Values.topologySpreadConstraints }} -{{- $revisionHistoryLimit := include "airflow.revisionHistoryLimit" (list .Values.dagProcessor.revisionHistoryLimit .Values.revisionHistoryLimit) }} -{{- $securityContext := include "airflowPodSecurityContext" (list .Values.dagProcessor .Values) }} -{{- $containerSecurityContext := include "containerSecurityContext" (list .Values.dagProcessor .Values) }} -{{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list .Values.dagProcessor.logGroomerSidecar .Values) }} -{{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list .Values.dagProcessor.waitForMigrations .Values) }} -{{- $containerLifecycleHooks := or .Values.dagProcessor.containerLifecycleHooks .Values.containerLifecycleHooks }} apiVersion: apps/v1 kind: Deployment metadata: - name: {{ include "airflow.fullname" . }}-dag-processor + name: {{ include "airflow.fullname" . }}-dag-processor{{ $nameSuffix }} labels: tier: airflow component: dag-processor release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} {{- with .Values.labels }} {{- toYaml . | nindent 4 }} {{- end }} - {{- if .Values.dagProcessor.annotations }} - annotations: {{- toYaml .Values.dagProcessor.annotations | nindent 4 }} + {{- if $mergedConfig.annotations }} + annotations: {{- toYaml $mergedConfig.annotations | nindent 4 }} {{- end }} spec: - replicas: {{ .Values.dagProcessor.replicas }} + replicas: {{ $mergedConfig.replicas }} {{- if ne $revisionHistoryLimit "" }} revisionHistoryLimit: {{ $revisionHistoryLimit }} {{- end }} @@ -61,8 +81,11 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} - {{- if .Values.dagProcessor.strategy }} - strategy: {{- toYaml .Values.dagProcessor.strategy | nindent 4 }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} + {{- if $mergedConfig.strategy }} + strategy: {{- toYaml $mergedConfig.strategy | nindent 4 }} {{- end }} template: metadata: @@ -70,27 +93,30 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} - {{- if or .Values.labels .Values.dagProcessor.labels }} - {{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 8 }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} + {{- if or .Values.labels $mergedConfig.labels }} + {{- mustMerge $mergedConfig.labels .Values.labels | toYaml | nindent 8 }} {{- end }} annotations: - checksum/metadata-secret: {{ include (print $.Template.BasePath "/secrets/metadata-connection-secret.yaml") . | sha256sum }} - checksum/pgbouncer-config-secret: {{ include (print $.Template.BasePath "/secrets/pgbouncer-config-secret.yaml") . | sha256sum }} - checksum/airflow-config: {{ include (print $.Template.BasePath "/configmaps/configmap.yaml") . | sha256sum }} - checksum/extra-configmaps: {{ include (print $.Template.BasePath "/configmaps/extra-configmaps.yaml") . | sha256sum }} - checksum/extra-secrets: {{ include (print $.Template.BasePath "/secrets/extra-secrets.yaml") . | sha256sum }} - {{- if .Values.dagProcessor.safeToEvict }} + checksum/metadata-secret: {{ include (print .Template.BasePath "/secrets/metadata-connection-secret.yaml") . | sha256sum }} + checksum/pgbouncer-config-secret: {{ include (print .Template.BasePath "/secrets/pgbouncer-config-secret.yaml") . | sha256sum }} + checksum/airflow-config: {{ include (print .Template.BasePath "/configmaps/configmap.yaml") . | sha256sum }} + checksum/extra-configmaps: {{ include (print .Template.BasePath "/configmaps/extra-configmaps.yaml") . | sha256sum }} + checksum/extra-secrets: {{ include (print .Template.BasePath "/secrets/extra-secrets.yaml") . | sha256sum }} + {{- if $mergedConfig.safeToEvict }} cluster-autoscaler.kubernetes.io/safe-to-evict: "true" {{- end }} {{- if .Values.airflowPodAnnotations }} {{- toYaml .Values.airflowPodAnnotations | nindent 8 }} {{- end }} - {{- if .Values.dagProcessor.podAnnotations }} - {{- toYaml .Values.dagProcessor.podAnnotations | nindent 8 }} + {{- if $mergedConfig.podAnnotations }} + {{- toYaml $mergedConfig.podAnnotations | nindent 8 }} {{- end }} spec: - {{- if .Values.dagProcessor.priorityClassName }} - priorityClassName: {{ .Values.dagProcessor.priorityClassName }} + {{- if $mergedConfig.priorityClassName }} + priorityClassName: {{ $mergedConfig.priorityClassName }} {{- end }} nodeSelector: {{- toYaml $nodeSelector | nindent 8 }} affinity: @@ -103,6 +129,9 @@ spec: labelSelector: matchLabels: component: dag-processor + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} topologyKey: kubernetes.io/hostname weight: 100 {{- end }} @@ -111,15 +140,15 @@ spec: {{- end }} tolerations: {{- toYaml $tolerations | nindent 8 }} topologySpreadConstraints: {{- toYaml $topologySpreadConstraints | nindent 8 }} - terminationGracePeriodSeconds: {{ .Values.dagProcessor.terminationGracePeriodSeconds }} + terminationGracePeriodSeconds: {{ $mergedConfig.terminationGracePeriodSeconds }} restartPolicy: Always serviceAccountName: {{ include "dagProcessor.serviceAccountName" . }} securityContext: {{ $securityContext | nindent 8 }} imagePullSecrets: {{ include "image_pull_secrets" . | nindent 8 }} initContainers: - {{- if .Values.dagProcessor.waitForMigrations.enabled }} + {{- if $mergedConfig.waitForMigrations.enabled }} - name: wait-for-airflow-migrations - resources: {{- toYaml .Values.dagProcessor.resources | nindent 12 }} + resources: {{- toYaml $mergedConfig.resources | nindent 12 }} image: {{ template "airflow_image_for_migrations" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContextWaitForMigrations | nindent 12 }} @@ -127,8 +156,8 @@ spec: {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.extraVolumeMounts }} - {{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }} + {{- if $mergedConfig.extraVolumeMounts }} + {{- tpl (toYaml $mergedConfig.extraVolumeMounts) . | nindent 12 }} {{- end }} {{- include "airflow_config_mount" . | nindent 12 }} args: {{- include "wait-for-migrations-command" . | indent 10 }} @@ -136,15 +165,15 @@ spec: env: {{- include "custom_airflow_environment" . | indent 10 }} {{- include "standard_airflow_environment" . | indent 10 }} - {{- if .Values.dagProcessor.waitForMigrations.env }} - {{- tpl (toYaml .Values.dagProcessor.waitForMigrations.env) $ | nindent 12 }} + {{- if $mergedConfig.waitForMigrations.env }} + {{- tpl (toYaml $mergedConfig.waitForMigrations.env) . | nindent 12 }} {{- end }} {{- end }} {{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }} {{- include "git_sync_container" (dict "Values" .Values "is_init" "true" "Template" .Template) | nindent 8 }} {{- end }} - {{- if .Values.dagProcessor.extraInitContainers }} - {{- tpl (toYaml .Values.dagProcessor.extraInitContainers) . | nindent 8 }} + {{- if $mergedConfig.extraInitContainers }} + {{- tpl (toYaml $mergedConfig.extraInitContainers) . | nindent 8 }} {{- end }} containers: - name: dag-processor @@ -154,19 +183,19 @@ spec: {{- if $containerLifecycleHooks }} lifecycle: {{- tpl (toYaml $containerLifecycleHooks) . | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.command }} - command: {{ tpl (toYaml .Values.dagProcessor.command) . | nindent 12 }} + {{- if $mergedConfig.command }} + command: {{ tpl (toYaml $mergedConfig.command) . | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.args }} - args: {{ tpl (toYaml .Values.dagProcessor.args) . | nindent 12 }} + {{- if $args }} + args: {{ tpl ($args | toYaml) . | nindent 12 }} {{- end }} - resources: {{- toYaml .Values.dagProcessor.resources | nindent 12 }} + resources: {{- toYaml $mergedConfig.resources | nindent 12 }} volumeMounts: {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.extraVolumeMounts }} - {{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }} + {{- if $mergedConfig.extraVolumeMounts }} + {{- tpl (toYaml $mergedConfig.extraVolumeMounts) . | nindent 12 }} {{- end }} - name: logs mountPath: {{ template "airflow_logs" . }} @@ -181,42 +210,42 @@ spec: env: {{- include "custom_airflow_environment" . | indent 10 }} {{- include "standard_airflow_environment" . | indent 10 }} - {{- include "container_extra_envs" (list . .Values.dagProcessor.env) | indent 10 }} + {{- include "container_extra_envs" (list . $mergedConfig.env) | indent 10 }} livenessProbe: - initialDelaySeconds: {{ .Values.dagProcessor.livenessProbe.initialDelaySeconds }} - timeoutSeconds: {{ .Values.dagProcessor.livenessProbe.timeoutSeconds }} - failureThreshold: {{ .Values.dagProcessor.livenessProbe.failureThreshold }} - periodSeconds: {{ .Values.dagProcessor.livenessProbe.periodSeconds }} + initialDelaySeconds: {{ $mergedConfig.livenessProbe.initialDelaySeconds }} + timeoutSeconds: {{ $mergedConfig.livenessProbe.timeoutSeconds }} + failureThreshold: {{ $mergedConfig.livenessProbe.failureThreshold }} + periodSeconds: {{ $mergedConfig.livenessProbe.periodSeconds }} exec: command: - {{- if .Values.dagProcessor.livenessProbe.command }} - {{- toYaml .Values.dagProcessor.livenessProbe.command | nindent 16 }} + {{- if $mergedConfig.livenessProbe.command }} + {{- toYaml $mergedConfig.livenessProbe.command | nindent 16 }} {{- else }} {{- include "dag_processor_liveness_check_command" . | indent 14 }} {{- end }} {{- if .Values.dags.gitSync.enabled }} {{- include "git_sync_container" . | indent 8 }} {{- end }} - {{- if .Values.dagProcessor.logGroomerSidecar.enabled }} + {{- if $mergedConfig.logGroomerSidecar.enabled }} - name: dag-processor-log-groomer - resources: {{- toYaml .Values.dagProcessor.logGroomerSidecar.resources | nindent 12 }} + resources: {{- toYaml $mergedConfig.logGroomerSidecar.resources | nindent 12 }} image: {{ template "airflow_image" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContextLogGroomerSidecar | nindent 12 }} - {{- if .Values.dagProcessor.logGroomerSidecar.command }} - command: {{ tpl (toYaml .Values.dagProcessor.logGroomerSidecar.command) . | nindent 12 }} + {{- if $mergedConfig.logGroomerSidecar.command }} + command: {{ tpl (toYaml $mergedConfig.logGroomerSidecar.command) . | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.logGroomerSidecar.args }} - args: {{- tpl (toYaml .Values.dagProcessor.logGroomerSidecar.args) . | nindent 12 }} + {{- if $mergedConfig.logGroomerSidecar.args }} + args: {{- tpl (toYaml $mergedConfig.logGroomerSidecar.args) . | nindent 12 }} {{- end }} env: - {{- if .Values.dagProcessor.logGroomerSidecar.retentionDays }} + {{- if $mergedConfig.logGroomerSidecar.retentionDays }} - name: AIRFLOW__LOG_RETENTION_DAYS - value: "{{ .Values.dagProcessor.logGroomerSidecar.retentionDays }}" + value: "{{ $mergedConfig.logGroomerSidecar.retentionDays }}" {{- end }} - {{- if .Values.dagProcessor.logGroomerSidecar.frequencyMinutes }} + {{- if $mergedConfig.logGroomerSidecar.frequencyMinutes }} - name: AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES - value: "{{ .Values.dagProcessor.logGroomerSidecar.frequencyMinutes }}" + value: "{{ $mergedConfig.logGroomerSidecar.frequencyMinutes }}" {{- end }} {{- if .Values.dagProcessor.logGroomerSidecar.maxSizeBytes }} - name: AIRFLOW__LOG_MAX_SIZE_BYTES @@ -228,8 +257,8 @@ spec: {{- end }} - name: AIRFLOW_HOME value: "{{ .Values.airflowHome }}" - {{- if .Values.dagProcessor.logGroomerSidecar.env }} - {{- tpl (toYaml .Values.dagProcessor.logGroomerSidecar.env) $ | nindent 12 }} + {{- if $mergedConfig.logGroomerSidecar.env }} + {{- tpl (toYaml $mergedConfig.logGroomerSidecar.env) . | nindent 12 }} {{- end }} volumeMounts: - name: logs @@ -240,15 +269,15 @@ spec: {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.extraVolumeMounts }} - {{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }} + {{- if $mergedConfig.extraVolumeMounts }} + {{- tpl (toYaml $mergedConfig.extraVolumeMounts) . | nindent 12 }} {{- end }} {{- if or .Values.webserver.webserverConfig .Values.webserver.webserverConfigConfigMapName }} {{- include "airflow_webserver_config_mount" . | nindent 12 }} {{- end }} {{- end }} - {{- if .Values.dagProcessor.extraContainers }} - {{- tpl (toYaml .Values.dagProcessor.extraContainers) . | nindent 8 }} + {{- if $mergedConfig.extraContainers }} + {{- tpl (toYaml $mergedConfig.extraContainers) . | nindent 8 }} {{- end }} volumes: - name: config @@ -273,8 +302,8 @@ spec: {{- if .Values.volumes }} {{- toYaml .Values.volumes | nindent 8 }} {{- end }} - {{- if .Values.dagProcessor.extraVolumes }} - {{- tpl (toYaml .Values.dagProcessor.extraVolumes) . | nindent 8 }} + {{- if $mergedConfig.extraVolumes }} + {{- tpl (toYaml $mergedConfig.extraVolumes) . | nindent 8 }} {{- end }} {{- if .Values.logs.persistence.enabled }} - name: logs @@ -283,5 +312,43 @@ spec: {{- else }} - name: logs emptyDir: {{- toYaml (default (dict) .Values.logs.emptyDirConfig) | nindent 12 }} + {{- end }} +{{- end }} + +################################ +## Airflow Dag Processor Deployment +################################# +{{- if semverCompare ">=2.3.0" .Values.airflowVersion }} +{{- $enabled := .Values.dagProcessor.enabled }} +{{- if eq $enabled nil}} + {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{- end }} +{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle.enabled | default false }} +{{- if $enabled }} + {{- if not $deployPerBundle }} + {{- /* Single deployment mode: use base dagProcessor config */}} + {{- $context := dict "Values" (merge (dict "dagProcessor" .Values.dagProcessor) .Values) "Release" .Release "Chart" .Chart "Template" .Template "Files" .Files "bundleName" "" "deployPerBundle" false }} + {{- include "dag-processor.deployment" $context }} + {{- else }} + {{- /* Per-bundle deployment mode: create one deployment per bundle */}} + {{- $firstBundle := true }} + {{- range $bundle := .Values.dagProcessor.dagBundleConfigList }} + {{- $bundleName := $bundle.name }} + {{- $bundleOverrides := index $.Values.dagProcessor.deployPerBundle.bundleOverrides $bundleName | default dict | deepCopy }} + {{- $baseConfig := $.Values.dagProcessor | deepCopy }} + {{- $mergedConfig := mergeOverwrite $baseConfig $bundleOverrides }} + {{- $_ := set $mergedConfig "dagBundleConfigList" $.Values.dagProcessor.dagBundleConfigList }} + {{- $bundleEnabled := $mergedConfig.enabled | default true }} + {{- if $bundleEnabled }} + {{- if not $firstBundle }} +--- + {{- end }} + {{- $firstBundle = false }} + {{- $bundleValues := merge (dict "dagProcessor" $mergedConfig) $.Values }} + {{- $context := dict "Values" $bundleValues "Release" $.Release "Chart" $.Chart "Template" $.Template "Files" $.Files "bundleName" $bundleName "deployPerBundle" true }} + {{- include "dag-processor.deployment" $context }} + {{- end }} + {{- end }} + {{- end }} {{- end }} {{- end }} diff --git a/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml b/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml index 27c39a8e050ce..a71551d9e4402 100644 --- a/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml +++ b/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml @@ -17,26 +17,41 @@ under the License. */}} -################################ -## Airflow Dag Processor PodDisruptionBudget -################################# -{{- $enabled := .Values.dagProcessor.enabled }} -{{- if eq $enabled nil}} - {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{/* +Helper template for dag processor PodDisruptionBudget +Expects context with: + - .Values.dagProcessor: dag processor configuration (may be merged with bundle overrides) + - .bundleName: bundle name (empty string if not per-bundle mode) + - .deployPerBundle: boolean indicating if per-bundle mode is enabled +*/}} +{{- define "dag-processor.poddisruptionbudget" }} +{{- $bundleName := .bundleName | default "" }} +{{- $deployPerBundle := .deployPerBundle | default false }} +{{- $mergedConfig := .Values.dagProcessor }} +{{- $bundleOverrides := .bundleOverrides | default dict }} +{{- if $mergedConfig.podDisruptionBudget.enabled }} +{{- /* Determine PDB name suffix */}} +{{- $nameSuffix := "" }} +{{- if $deployPerBundle }} + {{- $nameSuffix = printf "-%s-pdb" $bundleName }} +{{- else }} + {{- $nameSuffix = "-pdb" }} {{- end }} -{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }} apiVersion: policy/v1 kind: PodDisruptionBudget metadata: - name: {{ include "airflow.fullname" . }}-dag-processor-pdb + name: {{ include "airflow.fullname" . }}-dag-processor{{ $nameSuffix }} labels: tier: airflow component: dag-processor release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} - {{- if or (.Values.labels) (.Values.dagProcessor.labels) }} - {{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 4 }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} + {{- if or (.Values.labels) ($mergedConfig.labels) }} + {{- mustMerge $mergedConfig.labels .Values.labels | toYaml | nindent 4 }} {{- end }} spec: selector: @@ -44,5 +59,53 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} - {{- toYaml .Values.dagProcessor.podDisruptionBudget.config | nindent 2 }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} + {{- $pdbConfig := $mergedConfig.podDisruptionBudget.config }} + {{- if and $bundleOverrides.podDisruptionBudget $bundleOverrides.podDisruptionBudget.config }} + {{- $pdbConfig = $bundleOverrides.podDisruptionBudget.config }} + {{- end }} + {{- if and (hasKey $pdbConfig "minAvailable") (hasKey $pdbConfig "maxUnavailable") }} + {{- if ne (index $pdbConfig "minAvailable" | toString) "" }} + {{- $pdbConfig = omit $pdbConfig "maxUnavailable" }} + {{- end }} + {{- end }} + {{- toYaml $pdbConfig | nindent 2 }} +{{- end }} +{{- end }} + +################################ +## Airflow Dag Processor PodDisruptionBudget +################################# +{{- $enabled := .Values.dagProcessor.enabled }} +{{- if eq $enabled nil}} + {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{- end }} +{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle.enabled | default false }} +{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }} + {{- if not $deployPerBundle }} + {{- /* Single PDB mode: use base dagProcessor config */}} + {{- $context := dict "Values" (merge (dict "dagProcessor" .Values.dagProcessor) .Values) "Release" .Release "Chart" .Chart "Template" .Template "Files" .Files "bundleName" "" "deployPerBundle" false }} + {{- include "dag-processor.poddisruptionbudget" $context }} + {{- else }} + {{- /* Per-bundle PDB mode: create one PDB per bundle */}} + {{- $firstPDB := true }} + {{- range $bundle := .Values.dagProcessor.dagBundleConfigList }} + {{- $bundleName := $bundle.name }} + {{- $bundleOverrides := index $.Values.dagProcessor.deployPerBundle.bundleOverrides $bundleName | default dict | deepCopy }} + {{- $baseConfig := $.Values.dagProcessor | deepCopy }} + {{- $mergedConfig := mergeOverwrite $baseConfig $bundleOverrides }} + {{- $_ := set $mergedConfig "dagBundleConfigList" $.Values.dagProcessor.dagBundleConfigList }} + {{- if $mergedConfig.podDisruptionBudget.enabled }} + {{- if not $firstPDB }} +--- + {{- end }} + {{- $firstPDB = false }} + {{- $bundleValues := merge (dict "dagProcessor" $mergedConfig) $.Values }} + {{- $context := dict "Values" $bundleValues "Release" $.Release "Chart" $.Chart "Template" $.Template "Files" $.Files "bundleName" $bundleName "deployPerBundle" true "bundleOverrides" $bundleOverrides }} + {{- include "dag-processor.poddisruptionbudget" $context }} + {{- end }} + {{- end }} + {{- end }} {{- end }} diff --git a/chart/values.schema.json b/chart/values.schema.json index caac8a828f4d7..c0934c0dc6215 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -4637,6 +4637,48 @@ } } }, + "deployPerBundle": { + "description": "Per-bundle deployment option. When enabled, creates a separate deployment for each bundle in `dagBundleConfigList`.", + "type": "object", + "additionalProperties": false, + "properties": { + "enabled": { + "description": "Enable per-bundle deployments. When true, creates a separate deployment for each bundle.", + "type": "boolean", + "default": false + }, + "args": { + "description": "Command args template for per-bundle deployments. `{{ bundleName }}` will be replaced with the actual bundle name.", + "type": "array", + "default": [ + "bash", + "-c", + "exec airflow dag-processor --bundle-name {{ bundleName }}" + ], + "items": { + "type": "string" + } + }, + "bundleOverrides": { + "description": "Per-bundle specific overrides. Each key should match a bundle name from `dagBundleConfigList`.", + "type": "object", + "additionalProperties": { + "type": "object", + "additionalProperties": {} + }, + "default": {} + } + }, + "default": { + "enabled": false, + "args": [ + "bash", + "-c", + "exec airflow dag-processor --bundle-name {{ bundleName }}" + ], + "bundleOverrides": {} + } + }, "livenessProbe": { "description": "Liveness probe configuration for dag processor.", "type": "object", diff --git a/chart/values.yaml b/chart/values.yaml index 17b1e14f5137d..d8cc5c0e69f0e 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -2372,6 +2372,39 @@ dagProcessor: # classpath: "airflow.dag_processing.bundles.local.LocalDagBundle" # kwargs: {} + # Per-bundle deployment option + # When enabled, creates a separate deployment for each bundle in `dagBundleConfigList` + deployPerBundle: + enabled: false + # Command args template for per-bundle deployments + # `{{ bundleName }}` will be replaced with the actual bundle name + args: ["bash", "-c", "exec airflow dag-processor --bundle-name {{ bundleName }}"] + + # Per-bundle specific overrides (optional) + # Each key should match a bundle name from `dagBundleConfigList` + # These settings override the base dagProcessor configuration for each bundle + bundleOverrides: {} + # Example: Override settings for a specific bundle + # bundleOverrides: + # bundle1: + # replicas: 3 + # resources: + # requests: + # memory: "2Gi" + # cpu: "1000m" + # limits: + # memory: "4Gi" + # cpu: "2000m" + # nodeSelector: + # workload-type: production + # env: + # - name: SOME_ENV_VAR + # value: "20" + # podDisruptionBudget: + # enabled: true + # config: + # minAvailable: 1 + # Number of airflow dag processors in the deployment replicas: 1 # Max number of old replicasets to retain diff --git a/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py b/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py new file mode 100644 index 0000000000000..c2ac1966c7f2c --- /dev/null +++ b/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py @@ -0,0 +1,375 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import jmespath +from chart_utils.helm_template_generator import render_chart + + +class TestDagProcessorPerBundle: + """Tests DAG processor per-bundle deployment feature.""" + + def test_deploy_per_bundle_disabled_creates_single_deployment(self): + """Test that when deployPerBundle.enabled is false, single deployment is created.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": {"enabled": False}, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.name", docs[0]) == "release-name-dag-processor" + assert "bundle" not in jmespath.search("metadata.labels", docs[0]) + + def test_deploy_per_bundle_enabled_creates_multiple_deployments(self): + """Test that when deployPerBundle.enabled is true, one deployment per bundle is created.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": {"enabled": True}, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + { + "name": "bundle2", + "classpath": "airflow.providers.git.bundles.git.GitDagBundle", + "kwargs": { + "git_conn_id": "GITHUB__repo2", + "subdir": "dags", + "tracking_ref": "main", + "refresh_interval": 60, + }, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 2 + deployment_names = [jmespath.search("metadata.name", doc) for doc in docs] + assert "release-name-dag-processor-bundle1" in deployment_names + assert "release-name-dag-processor-bundle2" in deployment_names + + def test_per_bundle_deployment_has_bundle_label(self): + """Test that per-bundle deployments have bundle label.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": {"enabled": True}, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.labels.bundle", docs[0]) == "bundle1" + assert jmespath.search("spec.selector.matchLabels.bundle", docs[0]) == "bundle1" + assert jmespath.search("spec.template.metadata.labels.bundle", docs[0]) == "bundle1" + + def test_per_bundle_args_contains_bundle_name(self): + """Test that per-bundle args contain the bundle name.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": { + "enabled": True, + "args": ["bash", "-c", "exec airflow dag-processor --bundle-name {{ bundleName }}"], + }, + "dagBundleConfigList": [ + { + "name": "test-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + args = jmespath.search("spec.template.spec.containers[0].args", docs[0]) + assert args is not None + assert any("--bundle-name test-bundle" in arg for arg in args) + + def test_bundle_overrides_apply_to_deployment(self): + """Test that bundleOverrides are correctly applied to per-bundle deployments.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "replicas": 1, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + "deployPerBundle": { + "enabled": True, + "bundleOverrides": { + "bundle1": { + "replicas": 3, + "resources": { + "requests": {"memory": "2Gi", "cpu": "1000m"}, + "limits": {"memory": "4Gi", "cpu": "2000m"}, + }, + }, + }, + }, + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 2 + # Find bundle1 deployment + bundle1_doc = next(doc for doc in docs if "bundle1" in jmespath.search("metadata.name", doc)) + bundle2_doc = next(doc for doc in docs if "bundle2" in jmespath.search("metadata.name", doc)) + + # bundle1 should have overridden replicas + assert jmespath.search("spec.replicas", bundle1_doc) == 3 + assert ( + jmespath.search("spec.template.spec.containers[0].resources.requests.memory", bundle1_doc) + == "2Gi" + ) + assert ( + jmespath.search("spec.template.spec.containers[0].resources.requests.cpu", bundle1_doc) == "1000m" + ) + + # bundle2 should have default replicas + assert jmespath.search("spec.replicas", bundle2_doc) == 1 + + def test_bundle_overrides_env_variables(self): + """Test that bundleOverrides can override environment variables.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": { + "enabled": True, + "bundleOverrides": { + "bundle1": { + "env": [ + {"name": "CUSTOM_VAR", "value": "custom_value"}, + ], + }, + }, + }, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + env_vars = jmespath.search("spec.template.spec.containers[0].env", docs[0]) + env_dict = {env["name"]: env["value"] for env in env_vars if "value" in env} + assert env_dict.get("CUSTOM_VAR") == "custom_value" + + def test_per_bundle_pdb_created(self): + """Test that PodDisruptionBudget is created for each bundle when deployPerBundle is enabled.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "podDisruptionBudget": {"enabled": True}, + "deployPerBundle": {"enabled": True}, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-poddisruptionbudget.yaml"], + ) + + assert len(docs) == 2 + pdb_names = [jmespath.search("metadata.name", doc) for doc in docs] + assert "release-name-dag-processor-bundle1-pdb" in pdb_names + assert "release-name-dag-processor-bundle2-pdb" in pdb_names + + def test_per_bundle_pdb_has_bundle_label(self): + """Test that per-bundle PDBs have bundle label.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "podDisruptionBudget": {"enabled": True}, + "deployPerBundle": {"enabled": True}, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-poddisruptionbudget.yaml"], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.labels.bundle", docs[0]) == "bundle1" + assert jmespath.search("spec.selector.matchLabels.bundle", docs[0]) == "bundle1" + + def test_per_bundle_pdb_bundle_overrides(self): + """Test that bundleOverrides can override PodDisruptionBudget settings.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "podDisruptionBudget": { + "enabled": True, + "config": {"maxUnavailable": 1}, + }, + "deployPerBundle": { + "enabled": True, + "bundleOverrides": { + "bundle1": { + "podDisruptionBudget": { + "enabled": True, + "config": {"minAvailable": 1}, + }, + }, + }, + }, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-poddisruptionbudget.yaml"], + ) + + assert len(docs) == 2 + bundle1_pdb = next(doc for doc in docs if "bundle1" in jmespath.search("metadata.name", doc)) + bundle2_pdb = next(doc for doc in docs if "bundle2" in jmespath.search("metadata.name", doc)) + + # bundle1 should have overridden config + assert "minAvailable" in jmespath.search("spec", bundle1_pdb) + assert jmespath.search("spec.minAvailable", bundle1_pdb) == 1 + + # bundle2 should have default config + assert "maxUnavailable" in jmespath.search("spec", bundle2_pdb) + assert jmespath.search("spec.maxUnavailable", bundle2_pdb) == 1 + + def test_per_bundle_affinity_includes_bundle_label(self): + """Test that podAntiAffinity includes bundle label when deployPerBundle is enabled.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": {"enabled": True}, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + affinity = jmespath.search( + "spec.template.spec.affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution[0].podAffinityTerm.labelSelector.matchLabels", + docs[0], + ) + assert affinity.get("bundle") == "bundle1" + assert affinity.get("component") == "dag-processor" + + def test_single_deployment_mode_affinity_no_bundle_label(self): + """Test that single deployment mode affinity does not include bundle label.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": {"enabled": False}, + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + affinity = jmespath.search( + "spec.template.spec.affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution[0].podAffinityTerm.labelSelector.matchLabels", + docs[0], + ) + assert "bundle" not in affinity + assert affinity.get("component") == "dag-processor" From e5136d465f7e093c965a9eb4100e7b213ee5beb2 Mon Sep 17 00:00:00 2001 From: Seonghwna Lee Date: Tue, 3 Mar 2026 08:35:16 +0900 Subject: [PATCH 2/2] Apply review of @jscheffi - Add deploymentOverride in dagBundleConfigList --- .../dag-processor-deployment.yaml | 17 +- .../dag-processor-poddisruptionbudget.yaml | 5 +- chart/values.schema.json | 49 +----- chart/values.yaml | 53 +++---- .../test_dag_processor_per_bundle.py | 147 +++++++++++------- 5 files changed, 132 insertions(+), 139 deletions(-) diff --git a/chart/templates/dag-processor/dag-processor-deployment.yaml b/chart/templates/dag-processor/dag-processor-deployment.yaml index 93dd08f822a83..6d9672901898c 100644 --- a/chart/templates/dag-processor/dag-processor-deployment.yaml +++ b/chart/templates/dag-processor/dag-processor-deployment.yaml @@ -38,14 +38,13 @@ Expects context with: {{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list $mergedConfig.logGroomerSidecar .Values) }} {{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list $mergedConfig.waitForMigrations .Values) }} {{- $containerLifecycleHooks := or $mergedConfig.containerLifecycleHooks .Values.containerLifecycleHooks }} -{{- /* Prepare args: use per-bundle args if enabled, otherwise use default args */}} +{{- /* Prepare args: append --bundle-name to the base args for per-bundle deployments */}} {{- $args := $mergedConfig.args }} -{{- if and $deployPerBundle .Values.dagProcessor.deployPerBundle.args }} - {{- $args = list }} - {{- range .Values.dagProcessor.deployPerBundle.args }} - {{- $arg := . | toString | replace "{{ bundleName }}" $bundleName }} - {{- $args = append $args $arg }} - {{- end }} +{{- if $deployPerBundle }} + {{- $lastIdx := sub (len $args) 1 }} + {{- $lastArg := printf "%s --bundle-name %s" (index $args (int $lastIdx)) $bundleName }} + {{- $initArgs := mustInitial $args }} + {{- $args = append $initArgs $lastArg }} {{- end }} {{- /* Determine deployment name suffix */}} {{- $nameSuffix := "" }} @@ -323,7 +322,7 @@ spec: {{- if eq $enabled nil}} {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} {{- end }} -{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle.enabled | default false }} +{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle | default false }} {{- if $enabled }} {{- if not $deployPerBundle }} {{- /* Single deployment mode: use base dagProcessor config */}} @@ -334,7 +333,7 @@ spec: {{- $firstBundle := true }} {{- range $bundle := .Values.dagProcessor.dagBundleConfigList }} {{- $bundleName := $bundle.name }} - {{- $bundleOverrides := index $.Values.dagProcessor.deployPerBundle.bundleOverrides $bundleName | default dict | deepCopy }} + {{- $bundleOverrides := $bundle.deploymentOverride | default dict | deepCopy }} {{- $baseConfig := $.Values.dagProcessor | deepCopy }} {{- $mergedConfig := mergeOverwrite $baseConfig $bundleOverrides }} {{- $_ := set $mergedConfig "dagBundleConfigList" $.Values.dagProcessor.dagBundleConfigList }} diff --git a/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml b/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml index a71551d9e4402..38390d6a7d1b2 100644 --- a/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml +++ b/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml @@ -23,6 +23,7 @@ Expects context with: - .Values.dagProcessor: dag processor configuration (may be merged with bundle overrides) - .bundleName: bundle name (empty string if not per-bundle mode) - .deployPerBundle: boolean indicating if per-bundle mode is enabled + - .bundleOverrides: raw bundle deploymentOverride dict (for PDB config resolution) */}} {{- define "dag-processor.poddisruptionbudget" }} {{- $bundleName := .bundleName | default "" }} @@ -82,7 +83,7 @@ spec: {{- if eq $enabled nil}} {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} {{- end }} -{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle.enabled | default false }} +{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle | default false }} {{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }} {{- if not $deployPerBundle }} {{- /* Single PDB mode: use base dagProcessor config */}} @@ -93,7 +94,7 @@ spec: {{- $firstPDB := true }} {{- range $bundle := .Values.dagProcessor.dagBundleConfigList }} {{- $bundleName := $bundle.name }} - {{- $bundleOverrides := index $.Values.dagProcessor.deployPerBundle.bundleOverrides $bundleName | default dict | deepCopy }} + {{- $bundleOverrides := $bundle.deploymentOverride | default dict | deepCopy }} {{- $baseConfig := $.Values.dagProcessor | deepCopy }} {{- $mergedConfig := mergeOverwrite $baseConfig $bundleOverrides }} {{- $_ := set $mergedConfig "dagBundleConfigList" $.Values.dagProcessor.dagBundleConfigList }} diff --git a/chart/values.schema.json b/chart/values.schema.json index c0934c0dc6215..c1bd7b4ba4a1c 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -4607,7 +4607,6 @@ ], "items": { "type": "object", - "additionalProperties": false, "required": [ "name", "classpath" @@ -4633,51 +4632,19 @@ "null" ] } + }, + "deploymentOverride": { + "description": "Per-bundle deployment overrides. Only used when deployPerBundle is true. Overrides base dagProcessor settings for this bundle.", + "type": "object", + "additionalProperties": {} } } } }, "deployPerBundle": { - "description": "Per-bundle deployment option. When enabled, creates a separate deployment for each bundle in `dagBundleConfigList`.", - "type": "object", - "additionalProperties": false, - "properties": { - "enabled": { - "description": "Enable per-bundle deployments. When true, creates a separate deployment for each bundle.", - "type": "boolean", - "default": false - }, - "args": { - "description": "Command args template for per-bundle deployments. `{{ bundleName }}` will be replaced with the actual bundle name.", - "type": "array", - "default": [ - "bash", - "-c", - "exec airflow dag-processor --bundle-name {{ bundleName }}" - ], - "items": { - "type": "string" - } - }, - "bundleOverrides": { - "description": "Per-bundle specific overrides. Each key should match a bundle name from `dagBundleConfigList`.", - "type": "object", - "additionalProperties": { - "type": "object", - "additionalProperties": {} - }, - "default": {} - } - }, - "default": { - "enabled": false, - "args": [ - "bash", - "-c", - "exec airflow dag-processor --bundle-name {{ bundleName }}" - ], - "bundleOverrides": {} - } + "description": "When true, creates a separate deployment for each bundle in ``dagBundleConfigList``.", + "type": "boolean", + "default": false }, "livenessProbe": { "description": "Liveness probe configuration for dag processor.", diff --git a/chart/values.yaml b/chart/values.yaml index d8cc5c0e69f0e..67618b5181624 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -2361,6 +2361,25 @@ dagProcessor: # subdir: "dags" # tracking_ref: "main" # refresh_interval: 60 + # # Per-bundle deployment overrides (only used when deployPerBundle is true) + # deploymentOverride: + # replicas: 3 + # resources: + # requests: + # memory: "2Gi" + # cpu: "1000m" + # limits: + # memory: "4Gi" + # cpu: "2000m" + # nodeSelector: + # workload-type: production + # env: + # - name: SOME_ENV_VAR + # value: "20" + # podDisruptionBudget: + # enabled: true + # config: + # minAvailable: 1 # - name: bundle2 # classpath: "airflow.providers.git.bundles.git.GitDagBundle" # kwargs: @@ -2373,37 +2392,9 @@ dagProcessor: # kwargs: {} # Per-bundle deployment option - # When enabled, creates a separate deployment for each bundle in `dagBundleConfigList` - deployPerBundle: - enabled: false - # Command args template for per-bundle deployments - # `{{ bundleName }}` will be replaced with the actual bundle name - args: ["bash", "-c", "exec airflow dag-processor --bundle-name {{ bundleName }}"] - - # Per-bundle specific overrides (optional) - # Each key should match a bundle name from `dagBundleConfigList` - # These settings override the base dagProcessor configuration for each bundle - bundleOverrides: {} - # Example: Override settings for a specific bundle - # bundleOverrides: - # bundle1: - # replicas: 3 - # resources: - # requests: - # memory: "2Gi" - # cpu: "1000m" - # limits: - # memory: "4Gi" - # cpu: "2000m" - # nodeSelector: - # workload-type: production - # env: - # - name: SOME_ENV_VAR - # value: "20" - # podDisruptionBudget: - # enabled: true - # config: - # minAvailable: 1 + # When enabled, creates a separate deployment for each bundle in `dagBundleConfigList`. + # Bundle-specific deployment overrides can be set via `deploymentOverride` in each bundle entry. + deployPerBundle: false # Number of airflow dag processors in the deployment replicas: 1 diff --git a/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py b/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py index c2ac1966c7f2c..a6b5b3deeb032 100644 --- a/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py +++ b/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py @@ -24,12 +24,12 @@ class TestDagProcessorPerBundle: """Tests DAG processor per-bundle deployment feature.""" def test_deploy_per_bundle_disabled_creates_single_deployment(self): - """Test that when deployPerBundle.enabled is false, single deployment is created.""" + """Test that when deployPerBundle is false, single deployment is created.""" docs = render_chart( values={ "dagProcessor": { "enabled": True, - "deployPerBundle": {"enabled": False}, + "deployPerBundle": False, "dagBundleConfigList": [ { "name": "bundle1", @@ -52,12 +52,12 @@ def test_deploy_per_bundle_disabled_creates_single_deployment(self): assert "bundle" not in jmespath.search("metadata.labels", docs[0]) def test_deploy_per_bundle_enabled_creates_multiple_deployments(self): - """Test that when deployPerBundle.enabled is true, one deployment per bundle is created.""" + """Test that when deployPerBundle is true, one deployment per bundle is created.""" docs = render_chart( values={ "dagProcessor": { "enabled": True, - "deployPerBundle": {"enabled": True}, + "deployPerBundle": True, "dagBundleConfigList": [ { "name": "bundle1", @@ -91,7 +91,7 @@ def test_per_bundle_deployment_has_bundle_label(self): values={ "dagProcessor": { "enabled": True, - "deployPerBundle": {"enabled": True}, + "deployPerBundle": True, "dagBundleConfigList": [ { "name": "bundle1", @@ -110,15 +110,12 @@ def test_per_bundle_deployment_has_bundle_label(self): assert jmespath.search("spec.template.metadata.labels.bundle", docs[0]) == "bundle1" def test_per_bundle_args_contains_bundle_name(self): - """Test that per-bundle args contain the bundle name.""" + """Test that per-bundle args append --bundle-name to the base args.""" docs = render_chart( values={ "dagProcessor": { "enabled": True, - "deployPerBundle": { - "enabled": True, - "args": ["bash", "-c", "exec airflow dag-processor --bundle-name {{ bundleName }}"], - }, + "deployPerBundle": True, "dagBundleConfigList": [ { "name": "test-bundle", @@ -135,30 +132,49 @@ def test_per_bundle_args_contains_bundle_name(self): args = jmespath.search("spec.template.spec.containers[0].args", docs[0]) assert args is not None assert any("--bundle-name test-bundle" in arg for arg in args) + assert any("exec airflow dag-processor" in arg for arg in args) - def test_bundle_overrides_apply_to_deployment(self): - """Test that bundleOverrides are correctly applied to per-bundle deployments.""" + def test_per_bundle_args_respects_custom_base_args(self): + """Test that custom dagProcessor.args are respected and --bundle-name is appended.""" docs = render_chart( values={ "dagProcessor": { "enabled": True, - "replicas": 1, + "deployPerBundle": True, + "args": ["bash", "-c", "exec airflow dag-processor --some-flag"], "dagBundleConfigList": [ { - "name": "bundle1", + "name": "my-bundle", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {}, }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + args = jmespath.search("spec.template.spec.containers[0].args", docs[0]) + assert args is not None + last_arg = args[-1] + assert "--some-flag" in last_arg + assert "--bundle-name my-bundle" in last_arg + + def test_deployment_override_apply_to_deployment(self): + """Test that deploymentOverride in bundle definition is applied to per-bundle deployments.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "replicas": 1, + "deployPerBundle": True, + "dagBundleConfigList": [ { - "name": "bundle2", + "name": "bundle1", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {}, - }, - ], - "deployPerBundle": { - "enabled": True, - "bundleOverrides": { - "bundle1": { + "deploymentOverride": { "replicas": 3, "resources": { "requests": {"memory": "2Gi", "cpu": "1000m"}, @@ -166,18 +182,21 @@ def test_bundle_overrides_apply_to_deployment(self): }, }, }, - }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], } }, show_only=["templates/dag-processor/dag-processor-deployment.yaml"], ) assert len(docs) == 2 - # Find bundle1 deployment bundle1_doc = next(doc for doc in docs if "bundle1" in jmespath.search("metadata.name", doc)) bundle2_doc = next(doc for doc in docs if "bundle2" in jmespath.search("metadata.name", doc)) - # bundle1 should have overridden replicas assert jmespath.search("spec.replicas", bundle1_doc) == 3 assert ( jmespath.search("spec.template.spec.containers[0].resources.requests.memory", bundle1_doc) @@ -187,30 +206,25 @@ def test_bundle_overrides_apply_to_deployment(self): jmespath.search("spec.template.spec.containers[0].resources.requests.cpu", bundle1_doc) == "1000m" ) - # bundle2 should have default replicas assert jmespath.search("spec.replicas", bundle2_doc) == 1 - def test_bundle_overrides_env_variables(self): - """Test that bundleOverrides can override environment variables.""" + def test_deployment_override_env_variables(self): + """Test that deploymentOverride can override environment variables.""" docs = render_chart( values={ "dagProcessor": { "enabled": True, - "deployPerBundle": { - "enabled": True, - "bundleOverrides": { - "bundle1": { - "env": [ - {"name": "CUSTOM_VAR", "value": "custom_value"}, - ], - }, - }, - }, + "deployPerBundle": True, "dagBundleConfigList": [ { "name": "bundle1", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {}, + "deploymentOverride": { + "env": [ + {"name": "CUSTOM_VAR", "value": "custom_value"}, + ], + }, }, ], } @@ -230,7 +244,7 @@ def test_per_bundle_pdb_created(self): "dagProcessor": { "enabled": True, "podDisruptionBudget": {"enabled": True}, - "deployPerBundle": {"enabled": True}, + "deployPerBundle": True, "dagBundleConfigList": [ { "name": "bundle1", @@ -260,7 +274,7 @@ def test_per_bundle_pdb_has_bundle_label(self): "dagProcessor": { "enabled": True, "podDisruptionBudget": {"enabled": True}, - "deployPerBundle": {"enabled": True}, + "deployPerBundle": True, "dagBundleConfigList": [ { "name": "bundle1", @@ -277,8 +291,8 @@ def test_per_bundle_pdb_has_bundle_label(self): assert jmespath.search("metadata.labels.bundle", docs[0]) == "bundle1" assert jmespath.search("spec.selector.matchLabels.bundle", docs[0]) == "bundle1" - def test_per_bundle_pdb_bundle_overrides(self): - """Test that bundleOverrides can override PodDisruptionBudget settings.""" + def test_per_bundle_pdb_deployment_override(self): + """Test that deploymentOverride can override PodDisruptionBudget settings per bundle.""" docs = render_chart( values={ "dagProcessor": { @@ -287,22 +301,18 @@ def test_per_bundle_pdb_bundle_overrides(self): "enabled": True, "config": {"maxUnavailable": 1}, }, - "deployPerBundle": { - "enabled": True, - "bundleOverrides": { - "bundle1": { - "podDisruptionBudget": { - "enabled": True, - "config": {"minAvailable": 1}, - }, - }, - }, - }, + "deployPerBundle": True, "dagBundleConfigList": [ { "name": "bundle1", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {}, + "deploymentOverride": { + "podDisruptionBudget": { + "enabled": True, + "config": {"minAvailable": 1}, + }, + }, }, { "name": "bundle2", @@ -319,11 +329,9 @@ def test_per_bundle_pdb_bundle_overrides(self): bundle1_pdb = next(doc for doc in docs if "bundle1" in jmespath.search("metadata.name", doc)) bundle2_pdb = next(doc for doc in docs if "bundle2" in jmespath.search("metadata.name", doc)) - # bundle1 should have overridden config assert "minAvailable" in jmespath.search("spec", bundle1_pdb) assert jmespath.search("spec.minAvailable", bundle1_pdb) == 1 - # bundle2 should have default config assert "maxUnavailable" in jmespath.search("spec", bundle2_pdb) assert jmespath.search("spec.maxUnavailable", bundle2_pdb) == 1 @@ -333,7 +341,7 @@ def test_per_bundle_affinity_includes_bundle_label(self): values={ "dagProcessor": { "enabled": True, - "deployPerBundle": {"enabled": True}, + "deployPerBundle": True, "dagBundleConfigList": [ { "name": "bundle1", @@ -360,7 +368,7 @@ def test_single_deployment_mode_affinity_no_bundle_label(self): values={ "dagProcessor": { "enabled": True, - "deployPerBundle": {"enabled": False}, + "deployPerBundle": False, } }, show_only=["templates/dag-processor/dag-processor-deployment.yaml"], @@ -373,3 +381,30 @@ def test_single_deployment_mode_affinity_no_bundle_label(self): ) assert "bundle" not in affinity assert affinity.get("component") == "dag-processor" + + def test_deployment_override_not_included_in_bundle_config_json(self): + """Test that deploymentOverride is stripped from the dag_bundle_config_list JSON.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + "deploymentOverride": { + "replicas": 3, + }, + }, + ], + } + }, + show_only=["templates/configmaps/configmap.yaml"], + ) + + assert len(docs) == 1 + config_data = jmespath.search('data."airflow.cfg"', docs[0]) + assert "deploymentOverride" not in config_data + assert "bundle1" in config_data