diff --git a/chart/templates/dag-processor/dag-processor-deployment.yaml b/chart/templates/dag-processor/dag-processor-deployment.yaml index 2e4974f381752..d87c4b1cee9c2 100644 --- a/chart/templates/dag-processor/dag-processor-deployment.yaml +++ b/chart/templates/dag-processor/dag-processor-deployment.yaml @@ -26,6 +26,18 @@ {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} {{- end }} {{- if $enabled }} +{{- $processors := list }} +{{- if .Values.dagProcessor.multipleDagProcessing }} + {{- $processors = .Values.dagProcessor.multipleDagProcessing }} +{{- else }} + {{- $processors = list (dict "name" "") }} +{{- end }} +{{- $globals := . }} +{{- range $idx, $proc := $processors }} +{{- if $idx }} +--- +{{- end }} +{{- with $globals }} {{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }} {{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }} {{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }} @@ -36,16 +48,21 @@ {{- $containerSecurityContextLogGroomerSidecar := include "containerSecurityContext" (list .Values.dagProcessor.logGroomerSidecar .Values) }} {{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list .Values.dagProcessor.waitForMigrations .Values) }} {{- $containerLifecycleHooks := or .Values.dagProcessor.containerLifecycleHooks .Values.containerLifecycleHooks }} +{{- $procReplicas := $proc.replicas | default .Values.dagProcessor.replicas }} +{{- $procArgs := $proc.args | default .Values.dagProcessor.args }} apiVersion: apps/v1 kind: Deployment metadata: - name: {{ include "airflow.fullname" . }}-dag-processor + name: {{ include "airflow.fullname" . }}-dag-processor{{- if $proc.name }}-{{ $proc.name }}{{- end }} labels: tier: airflow component: dag-processor release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} + {{- if $proc.name }} + dag-processor-name: {{ $proc.name }} + {{- end }} {{- with .Values.labels }} {{- toYaml . | nindent 4 }} {{- end }} @@ -53,7 +70,7 @@ metadata: annotations: {{- toYaml .Values.dagProcessor.annotations | nindent 4 }} {{- end }} spec: - replicas: {{ .Values.dagProcessor.replicas }} + replicas: {{ $procReplicas }} {{- if ne $revisionHistoryLimit "" }} revisionHistoryLimit: {{ $revisionHistoryLimit }} {{- end }} @@ -62,6 +79,9 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} + {{- if $proc.name }} + dag-processor-name: {{ $proc.name }} + {{- end }} {{- if .Values.dagProcessor.strategy }} strategy: {{- toYaml .Values.dagProcessor.strategy | nindent 4 }} {{- end }} @@ -71,6 +91,9 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} + {{- if $proc.name }} + dag-processor-name: {{ $proc.name }} + {{- end }} {{- if or .Values.labels .Values.dagProcessor.labels }} {{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 8 }} {{- end }} @@ -158,8 +181,8 @@ spec: {{- if .Values.dagProcessor.command }} command: {{ tpl (toYaml .Values.dagProcessor.command) . | nindent 12 }} {{- end }} - {{- if .Values.dagProcessor.args }} - args: {{ tpl (toYaml .Values.dagProcessor.args) . | nindent 12 }} + {{- if $procArgs }} + args: {{ tpl (toYaml $procArgs) . | nindent 12 }} {{- end }} resources: {{- toYaml .Values.dagProcessor.resources | nindent 12 }} volumeMounts: @@ -287,3 +310,5 @@ spec: {{- end }} {{- end }} {{- end }} +{{- end }} +{{- end }} diff --git a/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml b/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml index e72676b186dc4..f85b737039a83 100644 --- a/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml +++ b/chart/templates/dag-processor/dag-processor-poddisruptionbudget.yaml @@ -26,16 +26,31 @@ {{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }} {{- end }} {{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }} +{{- $processors := list }} +{{- if .Values.dagProcessor.multipleDagProcessing }} + {{- $processors = .Values.dagProcessor.multipleDagProcessing }} +{{- else }} + {{- $processors = list (dict "name" "") }} +{{- end }} +{{- $globals := . }} +{{- range $idx, $proc := $processors }} +{{- if $idx }} +--- +{{- end }} +{{- with $globals }} apiVersion: policy/v1 kind: PodDisruptionBudget metadata: - name: {{ include "airflow.fullname" . }}-dag-processor-pdb + name: {{ include "airflow.fullname" . }}-dag-processor{{- if $proc.name }}-{{ $proc.name }}{{- end }}-pdb labels: tier: airflow component: dag-processor release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} + {{- if $proc.name }} + dag-processor-name: {{ $proc.name }} + {{- end }} {{- if or (.Values.labels) (.Values.dagProcessor.labels) }} {{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 4 }} {{- end }} @@ -45,6 +60,11 @@ spec: tier: airflow component: dag-processor release: {{ .Release.Name }} + {{- if $proc.name }} + dag-processor-name: {{ $proc.name }} + {{- end }} {{- toYaml .Values.dagProcessor.podDisruptionBudget.config | nindent 2 }} {{- end }} {{- end }} +{{- end }} +{{- end }} diff --git a/chart/values.schema.json b/chart/values.schema.json index 710b2445cf525..b2a8c70bbba4e 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -4944,6 +4944,38 @@ ], "additionalProperties": false } + }, + "multipleDagProcessing": { + "description": "Multiple Dag Processing configuration. When non-empty, creates a separate Deployment per entry instead of a single dag-processor Deployment. Each entry can specify its own replicas and args, falling back to ``dagProcessor.replicas`` and ``dagProcessor.args`` when not set.", + "type": "array", + "default": [], + "items": { + "type": "object", + "additionalProperties": false, + "required": [ + "name" + ], + "properties": { + "name": { + "description": "Unique name for this dag processor instance. Used as a suffix in the Deployment name.", + "type": "string" + }, + "replicas": { + "description": "Number of replicas for this dag processor instance. Falls back to ``dagProcessor.replicas`` if not set.", + "type": "integer" + }, + "args": { + "description": "Args to use when running this dag processor instance (templated). Falls back to ``dagProcessor.args`` if not set.", + "type": [ + "array", + "null" + ], + "items": { + "type": "string" + } + } + } + } } } }, diff --git a/chart/values.yaml b/chart/values.yaml index eef0b108599ed..bab16bfa37174 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -2473,6 +2473,21 @@ dagProcessor: # Environment variables to add to dag processor container env: [] + # Multiple Dag Processing Configuration + # When defined (non-empty), creates a separate Deployment for each entry instead + # of a single dag-processor Deployment. Each entry requires a unique 'name' and + # can specify its own 'replicas' and 'args'. If 'replicas' or 'args' are not set + # for an entry, they fall back to dagProcessor.replicas and dagProcessor.args. + multipleDagProcessing: [] + # Example: + # multipleDagProcessing: + # - name: git-dags + # replicas: 2 + # args: ["bash", "-c", "exec airflow dag-processor --subdir /opt/airflow/dags/git"] + # - name: local-dags + # replicas: 1 + # args: ["bash", "-c", "exec airflow dag-processor --subdir /opt/airflow/dags/local"] + # Flower settings flower: # Enable flower. diff --git a/helm-tests/tests/helm_tests/dagprocessor/test_multiple_dag_processing.py b/helm-tests/tests/helm_tests/dagprocessor/test_multiple_dag_processing.py new file mode 100644 index 0000000000000..76155c31c546e --- /dev/null +++ b/helm-tests/tests/helm_tests/dagprocessor/test_multiple_dag_processing.py @@ -0,0 +1,308 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import jmespath +import pytest +from chart_utils.helm_template_generator import render_chart + + +class TestMultipleDagProcessingDeployment: + """Tests for multipleDagProcessing deployment feature.""" + + AIRFLOW_VERSION = "3.0.0" + DEPLOYMENT_TEMPLATE = "templates/dag-processor/dag-processor-deployment.yaml" + + def test_default_single_deployment_when_multiple_dag_processing_not_set(self): + """When multipleDagProcessing is not set, a single deployment is created.""" + docs = render_chart( + name="test", + values={"airflowVersion": self.AIRFLOW_VERSION}, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.name", docs[0]) == "test-dag-processor" + + def test_default_single_deployment_when_multiple_dag_processing_empty(self): + """When multipleDagProcessing is an empty list, a single deployment is created.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": {"multipleDagProcessing": []}, + }, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.name", docs[0]) == "test-dag-processor" + + def test_creates_multiple_deployments(self): + """When multipleDagProcessing has entries, one deployment per entry is created.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": { + "multipleDagProcessing": [ + {"name": "git-dags"}, + {"name": "local-dags"}, + ], + }, + }, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + assert len(docs) == 2 + assert jmespath.search("[*].metadata.name", docs) == [ + "test-dag-processor-git-dags", + "test-dag-processor-local-dags", + ] + + def test_dag_processor_name_label_on_all_levels(self): + """Each deployment gets a dag-processor-name label on metadata, selector, and pod template.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": { + "multipleDagProcessing": [ + {"name": "git-dags"}, + {"name": "local-dags"}, + ], + }, + }, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + for doc, expected_name in zip(docs, ["git-dags", "local-dags"]): + assert jmespath.search('metadata.labels."dag-processor-name"', doc) == expected_name + assert jmespath.search('spec.selector.matchLabels."dag-processor-name"', doc) == expected_name + assert jmespath.search('spec.template.metadata.labels."dag-processor-name"', doc) == expected_name + + def test_no_dag_processor_name_label_when_not_using_multiple(self): + """When multipleDagProcessing is not set, no dag-processor-name label is added.""" + docs = render_chart( + name="test", + values={"airflowVersion": self.AIRFLOW_VERSION}, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + assert jmespath.search('metadata.labels."dag-processor-name"', docs[0]) is None + assert jmespath.search('spec.selector.matchLabels."dag-processor-name"', docs[0]) is None + assert jmespath.search('spec.template.metadata.labels."dag-processor-name"', docs[0]) is None + + def test_replicas_fallback_and_override(self): + """Replicas falls back to dagProcessor.replicas when not set, and can be overridden per entry.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": { + "replicas": 5, + "multipleDagProcessing": [ + {"name": "git-dags"}, + {"name": "local-dags", "replicas": 2}, + ], + }, + }, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + assert jmespath.search("spec.replicas", docs[0]) == 5 + assert jmespath.search("spec.replicas", docs[1]) == 2 + + def test_args_fallback_and_override(self): + """Args falls back to dagProcessor.args when not set, and can be overridden per entry.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": { + "args": ["bash", "-c", "exec airflow dag-processor --subdir /opt/airflow/dags/default"], + "multipleDagProcessing": [ + {"name": "default-args"}, + { + "name": "custom-args", + "args": [ + "bash", + "-c", + "exec airflow dag-processor --subdir /opt/airflow/dags/custom", + ], + }, + ], + }, + }, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + assert jmespath.search( + "spec.template.spec.containers[?name=='dag-processor'] | [0].args", docs[0] + ) == ["bash", "-c", "exec airflow dag-processor --subdir /opt/airflow/dags/default"] + assert jmespath.search( + "spec.template.spec.containers[?name=='dag-processor'] | [0].args", docs[1] + ) == ["bash", "-c", "exec airflow dag-processor --subdir /opt/airflow/dags/custom"] + + def test_global_labels_applied_to_all_deployments(self): + """Global labels are applied to all multiple dag processor deployments.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "labels": {"global_label": "global_value"}, + "dagProcessor": { + "multipleDagProcessing": [ + {"name": "git-dags"}, + {"name": "local-dags"}, + ], + }, + }, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + for doc in docs: + pod_labels = jmespath.search("spec.template.metadata.labels", doc) + assert pod_labels["global_label"] == "global_value" + + def test_component_labels_applied_to_all_deployments(self): + """Component-specific labels are applied to all multiple dag processor deployments.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": { + "labels": {"component_label": "component_value"}, + "multipleDagProcessing": [ + {"name": "git-dags"}, + {"name": "local-dags"}, + ], + }, + }, + show_only=[self.DEPLOYMENT_TEMPLATE], + ) + + for doc in docs: + pod_labels = jmespath.search("spec.template.metadata.labels", doc) + assert pod_labels["component_label"] == "component_value" + + +class TestMultipleDagProcessingPodDisruptionBudget: + """Tests for multipleDagProcessing PodDisruptionBudget feature.""" + + AIRFLOW_VERSION = "3.0.0" + PDB_TEMPLATE = "templates/dag-processor/dag-processor-poddisruptionbudget.yaml" + + def test_default_single_pdb_when_multiple_dag_processing_not_set(self): + """When multipleDagProcessing is not set, a single PDB is created.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": {"podDisruptionBudget": {"enabled": True}}, + }, + show_only=[self.PDB_TEMPLATE], + ) + + assert len(docs) == 1 + assert jmespath.search("metadata.name", docs[0]) == "test-dag-processor-pdb" + + def test_creates_multiple_pdbs(self): + """When multipleDagProcessing has entries, one PDB per entry is created.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": { + "podDisruptionBudget": {"enabled": True}, + "multipleDagProcessing": [ + {"name": "git-dags"}, + {"name": "local-dags"}, + ], + }, + }, + show_only=[self.PDB_TEMPLATE], + ) + + assert len(docs) == 2 + assert jmespath.search("[*].metadata.name", docs) == [ + "test-dag-processor-git-dags-pdb", + "test-dag-processor-local-dags-pdb", + ] + + def test_pdb_dag_processor_name_label_and_selector(self): + """Each PDB gets a dag-processor-name label and matching selector.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": { + "podDisruptionBudget": {"enabled": True}, + "multipleDagProcessing": [ + {"name": "git-dags"}, + {"name": "local-dags"}, + ], + }, + }, + show_only=[self.PDB_TEMPLATE], + ) + + for doc, expected_name in zip(docs, ["git-dags", "local-dags"]): + assert jmespath.search('metadata.labels."dag-processor-name"', doc) == expected_name + assert jmespath.search('spec.selector.matchLabels."dag-processor-name"', doc) == expected_name + + def test_no_pdb_dag_processor_name_label_when_not_using_multiple(self): + """When multipleDagProcessing is not set, no dag-processor-name label on PDB.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": {"podDisruptionBudget": {"enabled": True}}, + }, + show_only=[self.PDB_TEMPLATE], + ) + + assert jmespath.search('metadata.labels."dag-processor-name"', docs[0]) is None + assert jmespath.search('spec.selector.matchLabels."dag-processor-name"', docs[0]) is None + + @pytest.mark.parametrize( + "pdb_config", + [ + {"maxUnavailable": 1}, + {"minAvailable": 1}, + ], + ) + def test_pdb_config_applied_to_all_instances(self, pdb_config): + """PDB config is applied to all multiple dag processor PDBs.""" + docs = render_chart( + name="test", + values={ + "airflowVersion": self.AIRFLOW_VERSION, + "dagProcessor": { + "podDisruptionBudget": {"enabled": True, "config": pdb_config}, + "multipleDagProcessing": [ + {"name": "git-dags"}, + {"name": "local-dags"}, + ], + }, + }, + show_only=[self.PDB_TEMPLATE], + ) + + for doc in docs: + for key, value in pdb_config.items(): + assert jmespath.search(f"spec.{key}", doc) == value