diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml index ceb0041a7b72d..02bec647cf3c6 100644 --- a/chart/templates/_helpers.yaml +++ b/chart/templates/_helpers.yaml @@ -1053,7 +1053,7 @@ backwards compatibility. Otherwise, fall back to createUserJob.enabled. {{- end -}} {{/* -Convert dagBundleConfigList YAML list to JSON string for dag_bundle_config_list. +Convert `dagBundleConfigList` YAML list to JSON string for `dag_bundle_config_list`. This helper function converts the structured YAML format to the JSON string format required by Airflow's dag_bundle_config_list configuration. diff --git a/chart/templates/dag-processor/dag-processor-deployment.yaml b/chart/templates/dag-processor/dag-processor-deployment.yaml index 448682224a003..6d9672901898c 100644 --- a/chart/templates/dag-processor/dag-processor-deployment.yaml +++ b/chart/templates/dag-processor/dag-processor-deployment.yaml @@ -17,42 +17,61 @@ under the License. */}} -################################ -## Airflow Dag Processor Deployment -################################# -{{- $enabled := .Values.dagProcessor.enabled }} -{{- if eq $enabled nil}} - {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{/* +Helper template for dag processor deployment +Expects context with: + - .Values.dagProcessor: dag processor configuration (may be merged with bundle overrides) + - .bundleName: bundle name (empty string if not per-bundle mode) + - .deployPerBundle: boolean indicating if per-bundle mode is enabled +*/}} +{{- define "dag-processor.deployment" }} +{{- $bundleName := .bundleName | default "" }} +{{- $deployPerBundle := .deployPerBundle | default false }} +{{- $mergedConfig := .Values.dagProcessor }} +{{- $nodeSelector := or $mergedConfig.nodeSelector .Values.nodeSelector }} +{{- $affinity := or $mergedConfig.affinity .Values.affinity }} +{{- $tolerations := or $mergedConfig.tolerations .Values.tolerations }} +{{- $topologySpreadConstraints := or $mergedConfig.topologySpreadConstraints .Values.topologySpreadConstraints }} +{{- $revisionHistoryLimit := include "airflow.revisionHistoryLimit" (list $mergedConfig.revisionHistoryLimit .Values.revisionHistoryLimit) }} +{{- $securityContext := include "airflowPodSecurityContext" (list $mergedConfig .Values) }} +{{- $containerSecurityContext := include "containerSecurityContext" (list $mergedConfig .Values) }} +{{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list $mergedConfig.logGroomerSidecar .Values) }} +{{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list $mergedConfig.waitForMigrations .Values) }} +{{- $containerLifecycleHooks := or $mergedConfig.containerLifecycleHooks .Values.containerLifecycleHooks }} +{{- /* Prepare args: append --bundle-name to the base args for per-bundle deployments */}} +{{- $args := $mergedConfig.args }} +{{- if $deployPerBundle }} + {{- $lastIdx := sub (len $args) 1 }} + {{- $lastArg := printf "%s --bundle-name %s" (index $args (int $lastIdx)) $bundleName }} + {{- $initArgs := mustInitial $args }} + {{- $args = append $initArgs $lastArg }} +{{- end }} +{{- /* Determine deployment name suffix */}} +{{- $nameSuffix := "" }} +{{- if $deployPerBundle }} + {{- $nameSuffix = printf "-%s" $bundleName }} {{- end }} -{{- if $enabled }} -{{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }} -{{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }} -{{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }} -{{- $topologySpreadConstraints := or .Values.dagProcessor.topologySpreadConstraints .Values.topologySpreadConstraints }} -{{- $revisionHistoryLimit := include "airflow.revisionHistoryLimit" (list .Values.dagProcessor.revisionHistoryLimit .Values.revisionHistoryLimit) }} -{{- $securityContext := include "airflowPodSecurityContext" (list .Values.dagProcessor .Values) }} -{{- $containerSecurityContext := include "containerSecurityContext" (list .Values.dagProcessor .Values) }} -{{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list .Values.dagProcessor.logGroomerSidecar .Values) }} -{{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list .Values.dagProcessor.waitForMigrations .Values) }} -{{- $containerLifecycleHooks := or .Values.dagProcessor.containerLifecycleHooks .Values.containerLifecycleHooks }} apiVersion: apps/v1 kind: Deployment metadata: - name: {{ include "airflow.fullname" . }}-dag-processor + name: {{ include "airflow.fullname" . }}-dag-processor{{ $nameSuffix }} labels: tier: airflow component: dag-processor release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} {{- with .Values.labels }} {{- toYaml . | nindent 4 }} {{- end }} - {{- if .Values.dagProcessor.annotations }} - annotations: {{- toYaml .Values.dagProcessor.annotations | nindent 4 }} + {{- if $mergedConfig.annotations }} + annotations: {{- toYaml $mergedConfig.annotations | nindent 4 }} {{- end }} spec: - replicas: {{ .Values.dagProcessor.replicas }} + replicas: {{ $mergedConfig.replicas }} {{- if ne $revisionHistoryLimit "" }} revisionHistoryLimit: {{ $revisionHistoryLimit }} {{- end }} @@ -61,8 +80,11 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} - {{- if .Values.dagProcessor.strategy }} - strategy: {{- toYaml .Values.dagProcessor.strategy | nindent 4 }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} + {{- if $mergedConfig.strategy }} + strategy: {{- toYaml $mergedConfig.strategy | nindent 4 }} {{- end }} template: metadata: @@ -70,27 +92,30 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} - {{- if or .Values.labels .Values.dagProcessor.labels }} - {{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 8 }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} + {{- if or .Values.labels $mergedConfig.labels }} + {{- mustMerge $mergedConfig.labels .Values.labels | toYaml | nindent 8 }} {{- end }} annotations: - checksum/metadata-secret: {{ include (print $.Template.BasePath "/secrets/metadata-connection-secret.yaml") . | sha256sum }} - checksum/pgbouncer-config-secret: {{ include (print $.Template.BasePath "/secrets/pgbouncer-config-secret.yaml") . | sha256sum }} - checksum/airflow-config: {{ include (print $.Template.BasePath "/configmaps/configmap.yaml") . | sha256sum }} - checksum/extra-configmaps: {{ include (print $.Template.BasePath "/configmaps/extra-configmaps.yaml") . | sha256sum }} - checksum/extra-secrets: {{ include (print $.Template.BasePath "/secrets/extra-secrets.yaml") . | sha256sum }} - {{- if .Values.dagProcessor.safeToEvict }} + checksum/metadata-secret: {{ include (print .Template.BasePath "/secrets/metadata-connection-secret.yaml") . | sha256sum }} + checksum/pgbouncer-config-secret: {{ include (print .Template.BasePath "/secrets/pgbouncer-config-secret.yaml") . | sha256sum }} + checksum/airflow-config: {{ include (print .Template.BasePath "/configmaps/configmap.yaml") . | sha256sum }} + checksum/extra-configmaps: {{ include (print .Template.BasePath "/configmaps/extra-configmaps.yaml") . | sha256sum }} + checksum/extra-secrets: {{ include (print .Template.BasePath "/secrets/extra-secrets.yaml") . | sha256sum }} + {{- if $mergedConfig.safeToEvict }} cluster-autoscaler.kubernetes.io/safe-to-evict: "true" {{- end }} {{- if .Values.airflowPodAnnotations }} {{- toYaml .Values.airflowPodAnnotations | nindent 8 }} {{- end }} - {{- if .Values.dagProcessor.podAnnotations }} - {{- toYaml .Values.dagProcessor.podAnnotations | nindent 8 }} + {{- if $mergedConfig.podAnnotations }} + {{- toYaml $mergedConfig.podAnnotations | nindent 8 }} {{- end }} spec: - {{- if .Values.dagProcessor.priorityClassName }} - priorityClassName: {{ .Values.dagProcessor.priorityClassName }} + {{- if $mergedConfig.priorityClassName }} + priorityClassName: {{ $mergedConfig.priorityClassName }} {{- end }} nodeSelector: {{- toYaml $nodeSelector | nindent 8 }} affinity: @@ -103,6 +128,9 @@ spec: labelSelector: matchLabels: component: dag-processor + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} topologyKey: kubernetes.io/hostname weight: 100 {{- end }} @@ -111,15 +139,15 @@ spec: {{- end }} tolerations: {{- toYaml $tolerations | nindent 8 }} topologySpreadConstraints: {{- toYaml $topologySpreadConstraints | nindent 8 }} - terminationGracePeriodSeconds: {{ .Values.dagProcessor.terminationGracePeriodSeconds }} + terminationGracePeriodSeconds: {{ $mergedConfig.terminationGracePeriodSeconds }} restartPolicy: Always serviceAccountName: {{ include "dagProcessor.serviceAccountName" . }} securityContext: {{ $securityContext | nindent 8 }} imagePullSecrets: {{ include "image_pull_secrets" . | nindent 8 }} initContainers: - {{- if .Values.dagProcessor.waitForMigrations.enabled }} + {{- if $mergedConfig.waitForMigrations.enabled }} - name: wait-for-airflow-migrations - resources: {{- toYaml .Values.dagProcessor.resources | nindent 12 }} + resources: {{- toYaml $mergedConfig.resources | nindent 12 }} image: {{ template "airflow_image_for_migrations" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContextWaitForMigrations | nindent 12 }} @@ -127,8 +155,8 @@ spec: {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.extraVolumeMounts }} - {{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }} + {{- if $mergedConfig.extraVolumeMounts }} + {{- tpl (toYaml $mergedConfig.extraVolumeMounts) . | nindent 12 }} {{- end }} {{- include "airflow_config_mount" . | nindent 12 }} args: {{- include "wait-for-migrations-command" . | indent 10 }} @@ -136,15 +164,15 @@ spec: env: {{- include "custom_airflow_environment" . | indent 10 }} {{- include "standard_airflow_environment" . | indent 10 }} - {{- if .Values.dagProcessor.waitForMigrations.env }} - {{- tpl (toYaml .Values.dagProcessor.waitForMigrations.env) $ | nindent 12 }} + {{- if $mergedConfig.waitForMigrations.env }} + {{- tpl (toYaml $mergedConfig.waitForMigrations.env) . | nindent 12 }} {{- end }} {{- end }} {{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }} {{- include "git_sync_container" (dict "Values" .Values "is_init" "true" "Template" .Template) | nindent 8 }} {{- end }} - {{- if .Values.dagProcessor.extraInitContainers }} - {{- tpl (toYaml .Values.dagProcessor.extraInitContainers) . | nindent 8 }} + {{- if $mergedConfig.extraInitContainers }} + {{- tpl (toYaml $mergedConfig.extraInitContainers) . | nindent 8 }} {{- end }} containers: - name: dag-processor @@ -154,19 +182,19 @@ spec: {{- if $containerLifecycleHooks }} lifecycle: {{- tpl (toYaml $containerLifecycleHooks) . | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.command }} - command: {{ tpl (toYaml .Values.dagProcessor.command) . | nindent 12 }} + {{- if $mergedConfig.command }} + command: {{ tpl (toYaml $mergedConfig.command) . | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.args }} - args: {{ tpl (toYaml .Values.dagProcessor.args) . | nindent 12 }} + {{- if $args }} + args: {{ tpl ($args | toYaml) . | nindent 12 }} {{- end }} - resources: {{- toYaml .Values.dagProcessor.resources | nindent 12 }} + resources: {{- toYaml $mergedConfig.resources | nindent 12 }} volumeMounts: {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.extraVolumeMounts }} - {{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }} + {{- if $mergedConfig.extraVolumeMounts }} + {{- tpl (toYaml $mergedConfig.extraVolumeMounts) . | nindent 12 }} {{- end }} - name: logs mountPath: {{ template "airflow_logs" . }} @@ -181,42 +209,42 @@ spec: env: {{- include "custom_airflow_environment" . | indent 10 }} {{- include "standard_airflow_environment" . | indent 10 }} - {{- include "container_extra_envs" (list . .Values.dagProcessor.env) | indent 10 }} + {{- include "container_extra_envs" (list . $mergedConfig.env) | indent 10 }} livenessProbe: - initialDelaySeconds: {{ .Values.dagProcessor.livenessProbe.initialDelaySeconds }} - timeoutSeconds: {{ .Values.dagProcessor.livenessProbe.timeoutSeconds }} - failureThreshold: {{ .Values.dagProcessor.livenessProbe.failureThreshold }} - periodSeconds: {{ .Values.dagProcessor.livenessProbe.periodSeconds }} + initialDelaySeconds: {{ $mergedConfig.livenessProbe.initialDelaySeconds }} + timeoutSeconds: {{ $mergedConfig.livenessProbe.timeoutSeconds }} + failureThreshold: {{ $mergedConfig.livenessProbe.failureThreshold }} + periodSeconds: {{ $mergedConfig.livenessProbe.periodSeconds }} exec: command: - {{- if .Values.dagProcessor.livenessProbe.command }} - {{- toYaml .Values.dagProcessor.livenessProbe.command | nindent 16 }} + {{- if $mergedConfig.livenessProbe.command }} + {{- toYaml $mergedConfig.livenessProbe.command | nindent 16 }} {{- else }} {{- include "dag_processor_liveness_check_command" . | indent 14 }} {{- end }} {{- if .Values.dags.gitSync.enabled }} {{- include "git_sync_container" . | indent 8 }} {{- end }} - {{- if .Values.dagProcessor.logGroomerSidecar.enabled }} + {{- if $mergedConfig.logGroomerSidecar.enabled }} - name: dag-processor-log-groomer - resources: {{- toYaml .Values.dagProcessor.logGroomerSidecar.resources | nindent 12 }} + resources: {{- toYaml $mergedConfig.logGroomerSidecar.resources | nindent 12 }} image: {{ template "airflow_image" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContextLogGroomerSidecar | nindent 12 }} - {{- if .Values.dagProcessor.logGroomerSidecar.command }} - command: {{ tpl (toYaml .Values.dagProcessor.logGroomerSidecar.command) . | nindent 12 }} + {{- if $mergedConfig.logGroomerSidecar.command }} + command: {{ tpl (toYaml $mergedConfig.logGroomerSidecar.command) . | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.logGroomerSidecar.args }} - args: {{- tpl (toYaml .Values.dagProcessor.logGroomerSidecar.args) . | nindent 12 }} + {{- if $mergedConfig.logGroomerSidecar.args }} + args: {{- tpl (toYaml $mergedConfig.logGroomerSidecar.args) . | nindent 12 }} {{- end }} env: - {{- if .Values.dagProcessor.logGroomerSidecar.retentionDays }} + {{- if $mergedConfig.logGroomerSidecar.retentionDays }} - name: AIRFLOW__LOG_RETENTION_DAYS - value: "{{ .Values.dagProcessor.logGroomerSidecar.retentionDays }}" + value: "{{ $mergedConfig.logGroomerSidecar.retentionDays }}" {{- end }} - {{- if .Values.dagProcessor.logGroomerSidecar.frequencyMinutes }} + {{- if $mergedConfig.logGroomerSidecar.frequencyMinutes }} - name: AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES - value: "{{ .Values.dagProcessor.logGroomerSidecar.frequencyMinutes }}" + value: "{{ $mergedConfig.logGroomerSidecar.frequencyMinutes }}" {{- end }} {{- if .Values.dagProcessor.logGroomerSidecar.maxSizeBytes }} - name: AIRFLOW__LOG_MAX_SIZE_BYTES @@ -228,8 +256,8 @@ spec: {{- end }} - name: AIRFLOW_HOME value: "{{ .Values.airflowHome }}" - {{- if .Values.dagProcessor.logGroomerSidecar.env }} - {{- tpl (toYaml .Values.dagProcessor.logGroomerSidecar.env) $ | nindent 12 }} + {{- if $mergedConfig.logGroomerSidecar.env }} + {{- tpl (toYaml $mergedConfig.logGroomerSidecar.env) . | nindent 12 }} {{- end }} volumeMounts: - name: logs @@ -240,15 +268,15 @@ spec: {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.extraVolumeMounts }} - {{- tpl (toYaml .Values.dagProcessor.extraVolumeMounts) . | nindent 12 }} + {{- if $mergedConfig.extraVolumeMounts }} + {{- tpl (toYaml $mergedConfig.extraVolumeMounts) . | nindent 12 }} {{- end }} {{- if or .Values.webserver.webserverConfig .Values.webserver.webserverConfigConfigMapName }} {{- include "airflow_webserver_config_mount" . | nindent 12 }} {{- end }} {{- end }} - {{- if .Values.dagProcessor.extraContainers }} - {{- tpl (toYaml .Values.dagProcessor.extraContainers) . | nindent 8 }} + {{- if $mergedConfig.extraContainers }} + {{- tpl (toYaml $mergedConfig.extraContainers) . | nindent 8 }} {{- end }} volumes: - name: config @@ -273,8 +301,8 @@ spec: {{- if .Values.volumes }} {{- toYaml .Values.volumes | nindent 8 }} {{- end }} - {{- if .Values.dagProcessor.extraVolumes }} - {{- tpl (toYaml .Values.dagProcessor.extraVolumes) . | nindent 8 }} + {{- if $mergedConfig.extraVolumes }} + {{- tpl (toYaml $mergedConfig.extraVolumes) . | nindent 8 }} {{- end }} {{- if .Values.logs.persistence.enabled }} - name: logs @@ -283,5 +311,43 @@ spec: {{- else }} - name: logs emptyDir: {{- toYaml (default (dict) .Values.logs.emptyDirConfig) | nindent 12 }} + {{- end }} +{{- end }} + +################################ +## Airflow Dag Processor Deployment +################################# +{{- if semverCompare ">=2.3.0" .Values.airflowVersion }} +{{- $enabled := .Values.dagProcessor.enabled }} +{{- if eq $enabled nil}} + {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{- end }} +{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle | default false }} +{{- if $enabled }} + {{- if not $deployPerBundle }} + {{- /* Single deployment mode: use base dagProcessor config */}} + {{- $context := dict "Values" (merge (dict "dagProcessor" .Values.dagProcessor) .Values) "Release" .Release "Chart" .Chart "Template" .Template "Files" .Files "bundleName" "" "deployPerBundle" false }} + {{- include "dag-processor.deployment" $context }} + {{- else }} + {{- /* Per-bundle deployment mode: create one deployment per bundle */}} + {{- $firstBundle := true }} + {{- range $bundle := .Values.dagProcessor.dagBundleConfigList }} + {{- $bundleName := $bundle.name }} + {{- $bundleOverrides := $bundle.deploymentOverride | default dict | deepCopy }} + {{- $baseConfig := $.Values.dagProcessor | deepCopy }} + {{- $mergedConfig := mergeOverwrite $baseConfig $bundleOverrides }} + {{- $_ := set $mergedConfig "dagBundleConfigList" $.Values.dagProcessor.dagBundleConfigList }} + {{- $bundleEnabled := $mergedConfig.enabled | default true }} + {{- if $bundleEnabled }} + {{- if not $firstBundle }} +--- + {{- end }} + {{- $firstBundle = false }} + {{- $bundleValues := merge (dict "dagProcessor" $mergedConfig) $.Values }} + {{- $context := dict "Values" $bundleValues "Release" $.Release "Chart" $.Chart "Template" $.Template "Files" $.Files "bundleName" $bundleName "deployPerBundle" true }} + {{- include "dag-processor.deployment" $context }} + {{- end }} + {{- end }} + {{- end }} {{- end }} {{- end }} diff --git a/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml b/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml index 27c39a8e050ce..38390d6a7d1b2 100644 --- a/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml +++ b/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml @@ -17,26 +17,42 @@ under the License. */}} -################################ -## Airflow Dag Processor PodDisruptionBudget -################################# -{{- $enabled := .Values.dagProcessor.enabled }} -{{- if eq $enabled nil}} - {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{/* +Helper template for dag processor PodDisruptionBudget +Expects context with: + - .Values.dagProcessor: dag processor configuration (may be merged with bundle overrides) + - .bundleName: bundle name (empty string if not per-bundle mode) + - .deployPerBundle: boolean indicating if per-bundle mode is enabled + - .bundleOverrides: raw bundle deploymentOverride dict (for PDB config resolution) +*/}} +{{- define "dag-processor.poddisruptionbudget" }} +{{- $bundleName := .bundleName | default "" }} +{{- $deployPerBundle := .deployPerBundle | default false }} +{{- $mergedConfig := .Values.dagProcessor }} +{{- $bundleOverrides := .bundleOverrides | default dict }} +{{- if $mergedConfig.podDisruptionBudget.enabled }} +{{- /* Determine PDB name suffix */}} +{{- $nameSuffix := "" }} +{{- if $deployPerBundle }} + {{- $nameSuffix = printf "-%s-pdb" $bundleName }} +{{- else }} + {{- $nameSuffix = "-pdb" }} {{- end }} -{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }} apiVersion: policy/v1 kind: PodDisruptionBudget metadata: - name: {{ include "airflow.fullname" . }}-dag-processor-pdb + name: {{ include "airflow.fullname" . }}-dag-processor{{ $nameSuffix }} labels: tier: airflow component: dag-processor release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} - {{- if or (.Values.labels) (.Values.dagProcessor.labels) }} - {{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 4 }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} + {{- if or (.Values.labels) ($mergedConfig.labels) }} + {{- mustMerge $mergedConfig.labels .Values.labels | toYaml | nindent 4 }} {{- end }} spec: selector: @@ -44,5 +60,53 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} - {{- toYaml .Values.dagProcessor.podDisruptionBudget.config | nindent 2 }} + {{- if $deployPerBundle }} + bundle: {{ $bundleName }} + {{- end }} + {{- $pdbConfig := $mergedConfig.podDisruptionBudget.config }} + {{- if and $bundleOverrides.podDisruptionBudget $bundleOverrides.podDisruptionBudget.config }} + {{- $pdbConfig = $bundleOverrides.podDisruptionBudget.config }} + {{- end }} + {{- if and (hasKey $pdbConfig "minAvailable") (hasKey $pdbConfig "maxUnavailable") }} + {{- if ne (index $pdbConfig "minAvailable" | toString) "" }} + {{- $pdbConfig = omit $pdbConfig "maxUnavailable" }} + {{- end }} + {{- end }} + {{- toYaml $pdbConfig | nindent 2 }} +{{- end }} +{{- end }} + +################################ +## Airflow Dag Processor PodDisruptionBudget +################################# +{{- $enabled := .Values.dagProcessor.enabled }} +{{- if eq $enabled nil}} + {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} +{{- end }} +{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle | default false }} +{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }} + {{- if not $deployPerBundle }} + {{- /* Single PDB mode: use base dagProcessor config */}} + {{- $context := dict "Values" (merge (dict "dagProcessor" .Values.dagProcessor) .Values) "Release" .Release "Chart" .Chart "Template" .Template "Files" .Files "bundleName" "" "deployPerBundle" false }} + {{- include "dag-processor.poddisruptionbudget" $context }} + {{- else }} + {{- /* Per-bundle PDB mode: create one PDB per bundle */}} + {{- $firstPDB := true }} + {{- range $bundle := .Values.dagProcessor.dagBundleConfigList }} + {{- $bundleName := $bundle.name }} + {{- $bundleOverrides := $bundle.deploymentOverride | default dict | deepCopy }} + {{- $baseConfig := $.Values.dagProcessor | deepCopy }} + {{- $mergedConfig := mergeOverwrite $baseConfig $bundleOverrides }} + {{- $_ := set $mergedConfig "dagBundleConfigList" $.Values.dagProcessor.dagBundleConfigList }} + {{- if $mergedConfig.podDisruptionBudget.enabled }} + {{- if not $firstPDB }} +--- + {{- end }} + {{- $firstPDB = false }} + {{- $bundleValues := merge (dict "dagProcessor" $mergedConfig) $.Values }} + {{- $context := dict "Values" $bundleValues "Release" $.Release "Chart" $.Chart "Template" $.Template "Files" $.Files "bundleName" $bundleName "deployPerBundle" true "bundleOverrides" $bundleOverrides }} + {{- include "dag-processor.poddisruptionbudget" $context }} + {{- end }} + {{- end }} + {{- end }} {{- end }} diff --git a/chart/values.schema.json b/chart/values.schema.json index caac8a828f4d7..c1bd7b4ba4a1c 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -4607,7 +4607,6 @@ ], "items": { "type": "object", - "additionalProperties": false, "required": [ "name", "classpath" @@ -4633,10 +4632,20 @@ "null" ] } + }, + "deploymentOverride": { + "description": "Per-bundle deployment overrides. Only used when deployPerBundle is true. Overrides base dagProcessor settings for this bundle.", + "type": "object", + "additionalProperties": {} } } } }, + "deployPerBundle": { + "description": "When true, creates a separate deployment for each bundle in ``dagBundleConfigList``.", + "type": "boolean", + "default": false + }, "livenessProbe": { "description": "Liveness probe configuration for dag processor.", "type": "object", diff --git a/chart/values.yaml b/chart/values.yaml index 17b1e14f5137d..67618b5181624 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -2361,6 +2361,25 @@ dagProcessor: # subdir: "dags" # tracking_ref: "main" # refresh_interval: 60 + # # Per-bundle deployment overrides (only used when deployPerBundle is true) + # deploymentOverride: + # replicas: 3 + # resources: + # requests: + # memory: "2Gi" + # cpu: "1000m" + # limits: + # memory: "4Gi" + # cpu: "2000m" + # nodeSelector: + # workload-type: production + # env: + # - name: SOME_ENV_VAR + # value: "20" + # podDisruptionBudget: + # enabled: true + # config: + # minAvailable: 1 # - name: bundle2 # classpath: "airflow.providers.git.bundles.git.GitDagBundle" # kwargs: @@ -2372,6 +2391,11 @@ dagProcessor: # classpath: "airflow.dag_processing.bundles.local.LocalDagBundle" # kwargs: {} + # Per-bundle deployment option + # When enabled, creates a separate deployment for each bundle in `dagBundleConfigList`. + # Bundle-specific deployment overrides can be set via `deploymentOverride` in each bundle entry. + deployPerBundle: false + # Number of airflow dag processors in the deployment replicas: 1 # Max number of old replicasets to retain diff --git a/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py b/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py new file mode 100644 index 0000000000000..a6b5b3deeb032 --- /dev/null +++ b/helm-tests/tests/helm_tests/airflow_core/test_dag_processor_per_bundle.py @@ -0,0 +1,410 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import jmespath +from chart_utils.helm_template_generator import render_chart + + +class TestDagProcessorPerBundle: + """Tests DAG processor per-bundle deployment feature.""" + + def test_deploy_per_bundle_disabled_creates_single_deployment(self): + """Test that when deployPerBundle is false, single deployment is created.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": False, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.name", docs[0]) == "release-name-dag-processor" + assert "bundle" not in jmespath.search("metadata.labels", docs[0]) + + def test_deploy_per_bundle_enabled_creates_multiple_deployments(self): + """Test that when deployPerBundle is true, one deployment per bundle is created.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + { + "name": "bundle2", + "classpath": "airflow.providers.git.bundles.git.GitDagBundle", + "kwargs": { + "git_conn_id": "GITHUB__repo2", + "subdir": "dags", + "tracking_ref": "main", + "refresh_interval": 60, + }, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 2 + deployment_names = [jmespath.search("metadata.name", doc) for doc in docs] + assert "release-name-dag-processor-bundle1" in deployment_names + assert "release-name-dag-processor-bundle2" in deployment_names + + def test_per_bundle_deployment_has_bundle_label(self): + """Test that per-bundle deployments have bundle label.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.labels.bundle", docs[0]) == "bundle1" + assert jmespath.search("spec.selector.matchLabels.bundle", docs[0]) == "bundle1" + assert jmespath.search("spec.template.metadata.labels.bundle", docs[0]) == "bundle1" + + def test_per_bundle_args_contains_bundle_name(self): + """Test that per-bundle args append --bundle-name to the base args.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "test-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + args = jmespath.search("spec.template.spec.containers[0].args", docs[0]) + assert args is not None + assert any("--bundle-name test-bundle" in arg for arg in args) + assert any("exec airflow dag-processor" in arg for arg in args) + + def test_per_bundle_args_respects_custom_base_args(self): + """Test that custom dagProcessor.args are respected and --bundle-name is appended.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": True, + "args": ["bash", "-c", "exec airflow dag-processor --some-flag"], + "dagBundleConfigList": [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + args = jmespath.search("spec.template.spec.containers[0].args", docs[0]) + assert args is not None + last_arg = args[-1] + assert "--some-flag" in last_arg + assert "--bundle-name my-bundle" in last_arg + + def test_deployment_override_apply_to_deployment(self): + """Test that deploymentOverride in bundle definition is applied to per-bundle deployments.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "replicas": 1, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + "deploymentOverride": { + "replicas": 3, + "resources": { + "requests": {"memory": "2Gi", "cpu": "1000m"}, + "limits": {"memory": "4Gi", "cpu": "2000m"}, + }, + }, + }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 2 + bundle1_doc = next(doc for doc in docs if "bundle1" in jmespath.search("metadata.name", doc)) + bundle2_doc = next(doc for doc in docs if "bundle2" in jmespath.search("metadata.name", doc)) + + assert jmespath.search("spec.replicas", bundle1_doc) == 3 + assert ( + jmespath.search("spec.template.spec.containers[0].resources.requests.memory", bundle1_doc) + == "2Gi" + ) + assert ( + jmespath.search("spec.template.spec.containers[0].resources.requests.cpu", bundle1_doc) == "1000m" + ) + + assert jmespath.search("spec.replicas", bundle2_doc) == 1 + + def test_deployment_override_env_variables(self): + """Test that deploymentOverride can override environment variables.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + "deploymentOverride": { + "env": [ + {"name": "CUSTOM_VAR", "value": "custom_value"}, + ], + }, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + env_vars = jmespath.search("spec.template.spec.containers[0].env", docs[0]) + env_dict = {env["name"]: env["value"] for env in env_vars if "value" in env} + assert env_dict.get("CUSTOM_VAR") == "custom_value" + + def test_per_bundle_pdb_created(self): + """Test that PodDisruptionBudget is created for each bundle when deployPerBundle is enabled.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "podDisruptionBudget": {"enabled": True}, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-poddisruptionbudget.yaml"], + ) + + assert len(docs) == 2 + pdb_names = [jmespath.search("metadata.name", doc) for doc in docs] + assert "release-name-dag-processor-bundle1-pdb" in pdb_names + assert "release-name-dag-processor-bundle2-pdb" in pdb_names + + def test_per_bundle_pdb_has_bundle_label(self): + """Test that per-bundle PDBs have bundle label.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "podDisruptionBudget": {"enabled": True}, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-poddisruptionbudget.yaml"], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.labels.bundle", docs[0]) == "bundle1" + assert jmespath.search("spec.selector.matchLabels.bundle", docs[0]) == "bundle1" + + def test_per_bundle_pdb_deployment_override(self): + """Test that deploymentOverride can override PodDisruptionBudget settings per bundle.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "podDisruptionBudget": { + "enabled": True, + "config": {"maxUnavailable": 1}, + }, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + "deploymentOverride": { + "podDisruptionBudget": { + "enabled": True, + "config": {"minAvailable": 1}, + }, + }, + }, + { + "name": "bundle2", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-poddisruptionbudget.yaml"], + ) + + assert len(docs) == 2 + bundle1_pdb = next(doc for doc in docs if "bundle1" in jmespath.search("metadata.name", doc)) + bundle2_pdb = next(doc for doc in docs if "bundle2" in jmespath.search("metadata.name", doc)) + + assert "minAvailable" in jmespath.search("spec", bundle1_pdb) + assert jmespath.search("spec.minAvailable", bundle1_pdb) == 1 + + assert "maxUnavailable" in jmespath.search("spec", bundle2_pdb) + assert jmespath.search("spec.maxUnavailable", bundle2_pdb) == 1 + + def test_per_bundle_affinity_includes_bundle_label(self): + """Test that podAntiAffinity includes bundle label when deployPerBundle is enabled.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + }, + ], + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + affinity = jmespath.search( + "spec.template.spec.affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution[0].podAffinityTerm.labelSelector.matchLabels", + docs[0], + ) + assert affinity.get("bundle") == "bundle1" + assert affinity.get("component") == "dag-processor" + + def test_single_deployment_mode_affinity_no_bundle_label(self): + """Test that single deployment mode affinity does not include bundle label.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": False, + } + }, + show_only=["templates/dag-processor/dag-processor-deployment.yaml"], + ) + + assert len(docs) == 1 + affinity = jmespath.search( + "spec.template.spec.affinity.podAntiAffinity.preferredDuringSchedulingIgnoredDuringExecution[0].podAffinityTerm.labelSelector.matchLabels", + docs[0], + ) + assert "bundle" not in affinity + assert affinity.get("component") == "dag-processor" + + def test_deployment_override_not_included_in_bundle_config_json(self): + """Test that deploymentOverride is stripped from the dag_bundle_config_list JSON.""" + docs = render_chart( + values={ + "dagProcessor": { + "enabled": True, + "deployPerBundle": True, + "dagBundleConfigList": [ + { + "name": "bundle1", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {}, + "deploymentOverride": { + "replicas": 3, + }, + }, + ], + } + }, + show_only=["templates/configmaps/configmap.yaml"], + ) + + assert len(docs) == 1 + config_data = jmespath.search('data."airflow.cfg"', docs[0]) + assert "deploymentOverride" not in config_data + assert "bundle1" in config_data