Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions chart/templates/dag-processor/dag-processor-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- if $enabled }}
{{- $processors := list }}
{{- if .Values.dagProcessor.multipleDagProcessing }}
{{- $processors = .Values.dagProcessor.multipleDagProcessing }}
{{- else }}
{{- $processors = list (dict "name" "") }}
{{- end }}
{{- $globals := . }}
{{- range $idx, $proc := $processors }}
{{- if $idx }}
---
{{- end }}
{{- with $globals }}
{{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }}
{{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }}
{{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }}
Expand All @@ -36,24 +48,29 @@
{{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list .Values.dagProcessor.logGroomerSidecar .Values) }}
{{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list .Values.dagProcessor.waitForMigrations .Values) }}
{{- $containerLifecycleHooks := or .Values.dagProcessor.containerLifecycleHooks .Values.containerLifecycleHooks }}
{{- $procReplicas := $proc.replicas | default .Values.dagProcessor.replicas }}
{{- $procArgs := $proc.args | default .Values.dagProcessor.args }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "airflow.fullname" . }}-dag-processor
name: {{ include "airflow.fullname" . }}-dag-processor{{- if $proc.name }}-{{ $proc.name }}{{- end }}
labels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
{{- if $proc.name }}
dag-processor-name: {{ $proc.name }}
{{- end }}
{{- with .Values.labels }}
{{- toYaml . | nindent 4 }}
{{- end }}
{{- if .Values.dagProcessor.annotations }}
annotations: {{- toYaml .Values.dagProcessor.annotations | nindent 4 }}
{{- end }}
spec:
replicas: {{ .Values.dagProcessor.replicas }}
replicas: {{ $procReplicas }}
{{- if ne $revisionHistoryLimit "" }}
revisionHistoryLimit: {{ $revisionHistoryLimit }}
{{- end }}
Expand All @@ -62,6 +79,9 @@ spec:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
{{- if $proc.name }}
dag-processor-name: {{ $proc.name }}
{{- end }}
{{- if .Values.dagProcessor.strategy }}
strategy: {{- toYaml .Values.dagProcessor.strategy | nindent 4 }}
{{- end }}
Expand All @@ -71,6 +91,9 @@ spec:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
{{- if $proc.name }}
dag-processor-name: {{ $proc.name }}
{{- end }}
{{- if or .Values.labels .Values.dagProcessor.labels }}
{{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 8 }}
{{- end }}
Expand Down Expand Up @@ -158,8 +181,8 @@ spec:
{{- if .Values.dagProcessor.command }}
command: {{ tpl (toYaml .Values.dagProcessor.command) . | nindent 12 }}
{{- end }}
{{- if .Values.dagProcessor.args }}
args: {{ tpl (toYaml .Values.dagProcessor.args) . | nindent 12 }}
{{- if $procArgs }}
args: {{ tpl (toYaml $procArgs) . | nindent 12 }}
{{- end }}
resources: {{- toYaml .Values.dagProcessor.resources | nindent 12 }}
volumeMounts:
Expand Down Expand Up @@ -287,3 +310,5 @@ spec:
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,31 @@
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }}
{{- $processors := list }}
{{- if .Values.dagProcessor.multipleDagProcessing }}
{{- $processors = .Values.dagProcessor.multipleDagProcessing }}
{{- else }}
{{- $processors = list (dict "name" "") }}
{{- end }}
{{- $globals := . }}
{{- range $idx, $proc := $processors }}
{{- if $idx }}
---
{{- end }}
{{- with $globals }}
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: {{ include "airflow.fullname" . }}-dag-processor-pdb
name: {{ include "airflow.fullname" . }}-dag-processor{{- if $proc.name }}-{{ $proc.name }}{{- end }}-pdb
labels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
{{- if $proc.name }}
dag-processor-name: {{ $proc.name }}
{{- end }}
{{- if or (.Values.labels) (.Values.dagProcessor.labels) }}
{{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 4 }}
{{- end }}
Expand All @@ -45,6 +60,11 @@ spec:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
{{- if $proc.name }}
dag-processor-name: {{ $proc.name }}
{{- end }}
{{- toYaml .Values.dagProcessor.podDisruptionBudget.config | nindent 2 }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
32 changes: 32 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4944,6 +4944,38 @@
],
"additionalProperties": false
}
},
"multipleDagProcessing": {
"description": "Multiple Dag Processing configuration. When non-empty, creates a separate Deployment per entry instead of a single dag-processor Deployment. Each entry can specify its own replicas and args, falling back to ``dagProcessor.replicas`` and ``dagProcessor.args`` when not set.",
"type": "array",
"default": [],
"items": {
"type": "object",
"additionalProperties": false,
"required": [
"name"
],
"properties": {
"name": {
"description": "Unique name for this dag processor instance. Used as a suffix in the Deployment name.",
"type": "string"
},
"replicas": {
"description": "Number of replicas for this dag processor instance. Falls back to ``dagProcessor.replicas`` if not set.",
"type": "integer"
},
"args": {
"description": "Args to use when running this dag processor instance (templated). Falls back to ``dagProcessor.args`` if not set.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it does not make much sense to define multiple Dag processors and if no args given then falling back to the base. Then you would just have multiple replicas. If I decide to have multiple deployments then I'd assume there is a reason for different args == falback does not make sense in my view.

"type": [
"array",
"null"
],
"items": {
"type": "string"
}
}
}
}
}
}
},
Expand Down
15 changes: 15 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2473,6 +2473,21 @@ dagProcessor:
# Environment variables to add to dag processor container
env: []

# Multiple Dag Processing Configuration
# When defined (non-empty), creates a separate Deployment for each entry instead
# of a single dag-processor Deployment. Each entry requires a unique 'name' and
# can specify its own 'replicas' and 'args'. If 'replicas' or 'args' are not set
# for an entry, they fall back to dagProcessor.replicas and dagProcessor.args.
multipleDagProcessing: []
# Example:
# multipleDagProcessing:
# - name: git-dags
# replicas: 2
# args: ["bash", "-c", "exec airflow dag-processor --subdir /opt/airflow/dags/git"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it does not make much sense if the user must specify the full CLI here. It should rather present "additional args" and not the base shell command. Or would there be a reason to all another shell with another CLI for the use case?

# - name: local-dags
# replicas: 1
# args: ["bash", "-c", "exec airflow dag-processor --subdir /opt/airflow/dags/local"]

# Flower settings
flower:
# Enable flower.
Expand Down
Loading