Skip to content

Commit 5195d85

Browse files
committed
feat: Implement deployPerBundle feature for DAG processor deployments
1 parent 2e5023d commit 5195d85

6 files changed

Lines changed: 668 additions & 88 deletions

File tree

chart/templates/_helpers.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1047,7 +1047,7 @@ backwards compatibility. Otherwise, fall back to createUserJob.enabled.
10471047
{{- end -}}
10481048

10491049
{{/*
1050-
Convert dagBundleConfigList YAML list to JSON string for dag_bundle_config_list.
1050+
Convert `dagBundleConfigList` YAML list to JSON string for `dag_bundle_config_list`.
10511051
This helper function converts the structured YAML format to the JSON string
10521052
format required by Airflow's dag_bundle_config_list configuration.
10531053

chart/templates/dag-processor/dag-processor-deployment.yaml

Lines changed: 143 additions & 76 deletions
Large diffs are not rendered by default.

chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,95 @@
1717
under the License.
1818
*/}}
1919

20-
################################
21-
## Airflow Dag Processor PodDisruptionBudget
22-
#################################
23-
{{- $enabled := .Values.dagProcessor.enabled }}
24-
{{- if eq $enabled nil}}
25-
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
20+
{{/*
21+
Helper template for dag processor PodDisruptionBudget
22+
Expects context with:
23+
- .Values.dagProcessor: dag processor configuration (may be merged with bundle overrides)
24+
- .bundleName: bundle name (empty string if not per-bundle mode)
25+
- .deployPerBundle: boolean indicating if per-bundle mode is enabled
26+
*/}}
27+
{{- define "dag-processor.poddisruptionbudget" }}
28+
{{- $bundleName := .bundleName | default "" }}
29+
{{- $deployPerBundle := .deployPerBundle | default false }}
30+
{{- $mergedConfig := .Values.dagProcessor }}
31+
{{- $bundleOverrides := .bundleOverrides | default dict }}
32+
{{- if $mergedConfig.podDisruptionBudget.enabled }}
33+
{{- /* Determine PDB name suffix */}}
34+
{{- $nameSuffix := "" }}
35+
{{- if $deployPerBundle }}
36+
{{- $nameSuffix = printf "-%s-pdb" $bundleName }}
37+
{{- else }}
38+
{{- $nameSuffix = "-pdb" }}
2639
{{- end }}
27-
{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }}
2840
apiVersion: policy/v1
2941
kind: PodDisruptionBudget
3042
metadata:
31-
name: {{ include "airflow.fullname" . }}-dag-processor-pdb
43+
name: {{ include "airflow.fullname" . }}-dag-processor{{ $nameSuffix }}
3244
labels:
3345
tier: airflow
3446
component: dag-processor
3547
release: {{ .Release.Name }}
3648
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
3749
heritage: {{ .Release.Service }}
38-
{{- if or (.Values.labels) (.Values.dagProcessor.labels) }}
39-
{{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 4 }}
50+
{{- if $deployPerBundle }}
51+
bundle: {{ $bundleName }}
52+
{{- end }}
53+
{{- if or (.Values.labels) ($mergedConfig.labels) }}
54+
{{- mustMerge $mergedConfig.labels .Values.labels | toYaml | nindent 4 }}
4055
{{- end }}
4156
spec:
4257
selector:
4358
matchLabels:
4459
tier: airflow
4560
component: dag-processor
4661
release: {{ .Release.Name }}
47-
{{- toYaml .Values.dagProcessor.podDisruptionBudget.config | nindent 2 }}
62+
{{- if $deployPerBundle }}
63+
bundle: {{ $bundleName }}
64+
{{- end }}
65+
{{- $pdbConfig := $mergedConfig.podDisruptionBudget.config }}
66+
{{- if and $bundleOverrides.podDisruptionBudget $bundleOverrides.podDisruptionBudget.config }}
67+
{{- $pdbConfig = $bundleOverrides.podDisruptionBudget.config }}
68+
{{- end }}
69+
{{- if and (hasKey $pdbConfig "minAvailable") (hasKey $pdbConfig "maxUnavailable") }}
70+
{{- if ne (index $pdbConfig "minAvailable" | toString) "<nil>" }}
71+
{{- $pdbConfig = omit $pdbConfig "maxUnavailable" }}
72+
{{- end }}
73+
{{- end }}
74+
{{- toYaml $pdbConfig | nindent 2 }}
75+
{{- end }}
76+
{{- end }}
77+
78+
################################
79+
## Airflow Dag Processor PodDisruptionBudget
80+
#################################
81+
{{- $enabled := .Values.dagProcessor.enabled }}
82+
{{- if eq $enabled nil}}
83+
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
84+
{{- end }}
85+
{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle.enabled | default false }}
86+
{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }}
87+
{{- if not $deployPerBundle }}
88+
{{- /* Single PDB mode: use base dagProcessor config */}}
89+
{{- $context := dict "Values" (merge (dict "dagProcessor" .Values.dagProcessor) .Values) "Release" .Release "Chart" .Chart "Template" .Template "Files" .Files "bundleName" "" "deployPerBundle" false }}
90+
{{- include "dag-processor.poddisruptionbudget" $context }}
91+
{{- else }}
92+
{{- /* Per-bundle PDB mode: create one PDB per bundle */}}
93+
{{- $firstPDB := true }}
94+
{{- range $bundle := .Values.dagProcessor.dagBundleConfigList }}
95+
{{- $bundleName := $bundle.name }}
96+
{{- $bundleOverrides := index $.Values.dagProcessor.deployPerBundle.bundleOverrides $bundleName | default dict | deepCopy }}
97+
{{- $baseConfig := $.Values.dagProcessor | deepCopy }}
98+
{{- $mergedConfig := mergeOverwrite $baseConfig $bundleOverrides }}
99+
{{- $_ := set $mergedConfig "dagBundleConfigList" $.Values.dagProcessor.dagBundleConfigList }}
100+
{{- if $mergedConfig.podDisruptionBudget.enabled }}
101+
{{- if not $firstPDB }}
102+
---
103+
{{- end }}
104+
{{- $firstPDB = false }}
105+
{{- $bundleValues := merge (dict "dagProcessor" $mergedConfig) $.Values }}
106+
{{- $context := dict "Values" $bundleValues "Release" $.Release "Chart" $.Chart "Template" $.Template "Files" $.Files "bundleName" $bundleName "deployPerBundle" true "bundleOverrides" $bundleOverrides }}
107+
{{- include "dag-processor.poddisruptionbudget" $context }}
108+
{{- end }}
109+
{{- end }}
110+
{{- end }}
48111
{{- end }}

chart/values.schema.json

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4505,6 +4505,48 @@
45054505
}
45064506
}
45074507
},
4508+
"deployPerBundle": {
4509+
"description": "Per-bundle deployment option. When enabled, creates a separate deployment for each bundle in `dagBundleConfigList`.",
4510+
"type": "object",
4511+
"additionalProperties": false,
4512+
"properties": {
4513+
"enabled": {
4514+
"description": "Enable per-bundle deployments. When true, creates a separate deployment for each bundle.",
4515+
"type": "boolean",
4516+
"default": false
4517+
},
4518+
"args": {
4519+
"description": "Command args template for per-bundle deployments. `{{ bundleName }}` will be replaced with the actual bundle name.",
4520+
"type": "array",
4521+
"default": [
4522+
"bash",
4523+
"-c",
4524+
"exec airflow dag-processor --bundle-name {{ bundleName }}"
4525+
],
4526+
"items": {
4527+
"type": "string"
4528+
}
4529+
},
4530+
"bundleOverrides": {
4531+
"description": "Per-bundle specific overrides. Each key should match a bundle name from `dagBundleConfigList`.",
4532+
"type": "object",
4533+
"additionalProperties": {
4534+
"type": "object",
4535+
"additionalProperties": {}
4536+
},
4537+
"default": {}
4538+
}
4539+
},
4540+
"default": {
4541+
"enabled": false,
4542+
"args": [
4543+
"bash",
4544+
"-c",
4545+
"exec airflow dag-processor --bundle-name {{ bundleName }}"
4546+
],
4547+
"bundleOverrides": {}
4548+
}
4549+
},
45084550
"livenessProbe": {
45094551
"description": "Liveness probe configuration for dag processor.",
45104552
"type": "object",

chart/values.yaml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2309,6 +2309,39 @@ dagProcessor:
23092309
# classpath: "airflow.dag_processing.bundles.local.LocalDagBundle"
23102310
# kwargs: {}
23112311

2312+
# Per-bundle deployment option
2313+
# When enabled, creates a separate deployment for each bundle in `dagBundleConfigList`
2314+
deployPerBundle:
2315+
enabled: false
2316+
# Command args template for per-bundle deployments
2317+
# `{{ bundleName }}` will be replaced with the actual bundle name
2318+
args: ["bash", "-c", "exec airflow dag-processor --bundle-name {{ bundleName }}"]
2319+
2320+
# Per-bundle specific overrides (optional)
2321+
# Each key should match a bundle name from `dagBundleConfigList`
2322+
# These settings override the base dagProcessor configuration for each bundle
2323+
bundleOverrides: {}
2324+
# Example: Override settings for a specific bundle
2325+
# bundleOverrides:
2326+
# bundle1:
2327+
# replicas: 3
2328+
# resources:
2329+
# requests:
2330+
# memory: "2Gi"
2331+
# cpu: "1000m"
2332+
# limits:
2333+
# memory: "4Gi"
2334+
# cpu: "2000m"
2335+
# nodeSelector:
2336+
# workload-type: production
2337+
# env:
2338+
# - name: SOME_ENV_VAR
2339+
# value: "20"
2340+
# podDisruptionBudget:
2341+
# enabled: true
2342+
# config:
2343+
# minAvailable: 1
2344+
23122345
# Number of airflow dag processors in the deployment
23132346
replicas: 1
23142347
# Max number of old replicasets to retain

0 commit comments

Comments
 (0)