Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/build-and-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ jobs:
steps:
- name: Prepare app token
if: ${{ vars.GH_BUMP_VERSION_APP_ID != '' }}
uses: actions/create-github-app-token@v1
uses: actions/create-github-app-token@v3
id: app-token
with:
app-id: ${{ vars.GH_BUMP_VERSION_APP_ID }}
client-id: ${{ vars.GH_BUMP_VERSION_APP_ID }}
private-key: ${{ secrets.GH_BUMP_VERSION_APP_KEY }}

- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
token: ${{ steps.app-token.outputs.token || secrets.GITHUB_TOKEN }}

- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: '3.11'

Expand Down Expand Up @@ -83,7 +83,7 @@ jobs:
contents: write
pull-requests: write
needs: [ bump-and-publish ]
uses: Netcracker/qubership-workflow-hub/.github/workflows/release-drafter.yml@v1.0.1
uses: Netcracker/qubership-workflow-hub/.github/workflows/release-drafter.yml@v2.2.2
with:
version: ${{ needs.bump-and-publish.outputs.release_version }}
ref: ${{ github.ref_name }}
Expand All @@ -96,11 +96,11 @@ jobs:
packages: write
needs: [ bump-and-publish, create-github-release ]
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6

# requirements.txt generation
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v6
with:
python-version: '3.11'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ jobs:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6

# requirements.txt generation
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v6
with:
python-version: '3.11'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-conventional-commits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
name: Conventional Commits
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/checkout@v6
with:
persist-credentials: false

Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/reusable-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:

steps:
- name: Download previous run data to retry
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
if: ${{ inputs.retry_run_id != '' }}
with:
name: ${{ env.DEFAULT_PIPELINE_DIR }}
Expand Down Expand Up @@ -93,35 +93,35 @@ jobs:
python -m pipelines_declarative_executor archive --pipeline_dir="./${DEFAULT_PIPELINE_DIR}" --target_path="${DEFAULT_PIPELINE_DIR_ZIP}"

- name: Upload PIPELINE_DIR
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v7
if: always()
with:
name: PIPELINE_DIR
path: ${{ env.DEFAULT_PIPELINE_DIR_ZIP }}

- name: Upload PIPELINE_OUTPUT
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v7
if: always()
with:
name: PIPELINE_OUTPUT
path: ${{ env.DEFAULT_PIPELINE_DIR }}/pipeline_output

- name: Upload PIPELINE_REPORT
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v7
if: always()
with:
name: PIPELINE_REPORT
path: ${{ env.DEFAULT_PIPELINE_DIR }}/pipeline_state/pipeline_report.json

- name: Upload FULL EXECUTION LOG
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v7
if: always()
with:
name: full_execution.log
path: full_execution.log

- name: Upload DEBUG DATA
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v7
if: always()
with:
name: X_DEBUG
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: Run Tests

on:
workflow_dispatch: {}
pull_request: {}

permissions:
contents: read
Expand All @@ -21,11 +22,11 @@ jobs:
PIPELINES_DECLARATIVE_EXECUTOR_EXECUTION_EMAIL: ${{ github.event.sender.id }}+${{ github.actor }}@users.noreply.github.com
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6

# requirements.txt generation
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v6
with:
python-version: '3.11'

Expand Down
1 change: 1 addition & 0 deletions .sops-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v3.13.1
8 changes: 6 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ FROM python:3.13-alpine
LABEL org.opencontainers.image.description="Qubership Pipelines Declarative Executor"
WORKDIR /app

ARG SOPS_VERSION
COPY .sops-version /tmp/.sops-version

RUN apk add --no-cache p7zip curl procps git

# Install SOPS
RUN curl -LO https://github.com/getsops/sops/releases/download/v3.12.2/sops-v3.12.2.linux.amd64 && \
mv sops-v3.12.2.linux.amd64 /usr/local/bin/sops && chmod +x /usr/local/bin/sops
RUN SOPS_VERSION=${SOPS_VERSION:-$(cat /tmp/.sops-version)} && \
curl -LO https://github.com/getsops/sops/releases/download/${SOPS_VERSION}/sops-${SOPS_VERSION}.linux.amd64 && \
mv sops-${SOPS_VERSION}.linux.amd64 /usr/local/bin/sops && chmod +x /usr/local/bin/sops

# Install Executor
COPY . .
Expand Down
8 changes: 6 additions & 2 deletions debian.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ FROM python:3.11-slim
LABEL org.opencontainers.image.description="Qubership Pipelines Declarative Executor"
WORKDIR /app

ARG SOPS_VERSION
COPY .sops-version /tmp/.sops-version

RUN apt-get update && apt-get install -y --no-install-recommends p7zip-full curl procps git && rm -rf /var/lib/apt/lists/*

# Install SOPS
RUN curl -LO https://github.com/getsops/sops/releases/download/v3.12.2/sops-v3.12.2.linux.amd64 && \
mv sops-v3.12.2.linux.amd64 /usr/local/bin/sops && chmod +x /usr/local/bin/sops
RUN SOPS_VERSION=${SOPS_VERSION:-$(cat /tmp/.sops-version)} && \
curl -LO https://github.com/getsops/sops/releases/download/${SOPS_VERSION}/sops-${SOPS_VERSION}.linux.amd64 && \
mv sops-${SOPS_VERSION}.linux.amd64 /usr/local/bin/sops && chmod +x /usr/local/bin/sops

# Install Executor
COPY . .
Expand Down
15 changes: 11 additions & 4 deletions docs/atlas_pipeline_syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,17 @@ Nested pipelines (aka "Atlas Pipeline Triggers") allow you to reuse pipeline def
- name: Nested Pipeline
type: ATLAS_PIPELINE_TRIGGER
input:
pipeline_data: "path/to/nested_pipeline.yaml"
pipeline_vars: |
VAR1 = value1
VAR2 = value2
params:
params:
PIPELINE_DATA: "path/to/nested_pipeline.yaml"
IS_DRY_RUN: false
PIPELINE_VARS: |
VAR1 = value1
VAR2 = value2
params_secure:
params:
PIPELINE_VARS: |
HIDDEN_SECURE_VAR=WILL_BE_HIDDEN
output:
params:
RESULT_FROM_NESTED: "params.output_variable"
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ python = "^3.11"
PyYAML = "^6.0"
click = "^8.1"
requests = "^2.32.3"
urllib3 = "^2.6.3"
urllib3 = "^2.7.0"
idna = "^3.15"
aiohttp = "^3.12.15"
aiofiles = "^24.1.0"
miniopy-async = "^1.23.4"
tabulate = "0.9.0"
tabulate = "0.10.0"
psutil = "7.2.1"
typing-extensions = "4.15.0"

[tool.poetry.group.test.dependencies]
pytest = "^6.0.0"
pytest = "^9.0.3"

[tool.poetry.requires-plugins]
poetry-plugin-export = ">=1.8"
Expand Down
33 changes: 27 additions & 6 deletions src/pipelines_declarative_executor/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@ def cli():
@cli.command("run")
@click.option('--pipeline_data', required=True, type=str, help="Pipeline data (pipeline/config file paths)")
@click.option('--pipeline_vars', required=False, type=str, help="Pipeline vars with high priority")
@click.option('--pipeline_vars_secure', required=False, type=str, help="Secure pipeline vars with high priority that are masked in logs/report")
@click.option('--pipeline_dir', required=False, type=str, help="Path to directory where pipeline will be executed")
@click.option('--is_dry_run', default=False, type=bool, help="Dry run mode (no actual execution)")
@click.option('--log_level', default='INFO', show_default=True,
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], case_sensitive=False),
help="Console logging level")
def __run_pipeline(pipeline_data: str, pipeline_vars: str, pipeline_dir: str, is_dry_run: bool, log_level: str):
def __run_pipeline(pipeline_data: str, pipeline_vars: str, pipeline_vars_secure: str, pipeline_dir: str, is_dry_run: bool, log_level: str):
setup_cli_logging(log_level)
PythonModuleUtils.prepare_python_module()
logging.info(f'command "RUN" with params:\n{format_param("PIPELINE_DATA", pipeline_data)}\n{format_param("PIPELINE_VARS", pipeline_vars)}'
f'\nPIPELINE_DIR="{pipeline_dir}"\nIS_DRY_RUN="{is_dry_run}"\nLOG_LEVEL="{log_level}"')
logging.info(
f'command "RUN" with params:'
f'\n{format_param("PIPELINE_DATA", pipeline_data)}'
f'\n{format_pipeline_vars(pipeline_vars, pipeline_vars_secure)}'
f'\nPIPELINE_DIR="{pipeline_dir}"\nIS_DRY_RUN="{is_dry_run}"\nLOG_LEVEL="{log_level}"'
)
with (ProfilingUtils.time_it(), ProfilingUtils.profile_it(), ProfilingUtils.track_peak_usage()):
asyncio.run(create_and_run_pipeline(pipeline_data, pipeline_vars, pipeline_dir, is_dry_run))
asyncio.run(create_and_run_pipeline(pipeline_data, pipeline_vars, pipeline_vars_secure, pipeline_dir, is_dry_run))


@cli.command("retry")
Expand Down Expand Up @@ -81,15 +86,31 @@ def format_param(name: str, value: str | None) -> str:
return f"{name}: None"


async def create_and_run_pipeline(pipeline_data: str, pipeline_vars: str, pipeline_dir: str, is_dry_run: bool):
def format_pipeline_vars(pipeline_vars: str | None, pipeline_vars_secure: str | None) -> str:
formatted_vars = []
if pipeline_vars:
for item in StringUtils.trim_lines(pipeline_vars):
formatted_vars.append(f" {item}")
if pipeline_vars_secure:
for item in StringUtils.trim_lines(pipeline_vars_secure):
if '=' in item:
key, value = item.split('=', 1)
formatted_vars.append(f" {key.strip()}={StringUtils.mask_value(value=value)}")
if formatted_vars:
return "PIPELINE_VARS:\n" + "\n".join(formatted_vars)
return "PIPELINE_VARS: None"


async def create_and_run_pipeline(pipeline_data: str, pipeline_vars: str, pipeline_vars_secure: str, pipeline_dir: str, is_dry_run: bool):
from pipelines_declarative_executor.orchestrator.pipeline_orchestrator import PipelineOrchestrator
from pipelines_declarative_executor.executor.pipeline_executor import PipelineExecutor
from pipelines_declarative_executor.report.report_uploader import ReportUploader
from pipelines_declarative_executor.model.stage import ExecutionStatus
try:
pipeline_execution = PipelineOrchestrator.prepare_pipeline_execution(
pipeline_data=pipeline_data,
pipeline_vars=pipeline_vars
pipeline_vars=pipeline_vars,
pipeline_vars_secure=pipeline_vars_secure,
)
except Exception as e:
logging.error(f"Exception during orchestration: {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def input_source(name: str) -> dict:
return {"kind": "INPUT_VAR", "name": name}

@staticmethod
def set_pipeline_vars(vars_storage: PipelineVars, pipeline_vars: dict, source_path: str,
is_secure: bool = False, is_remote: bool = False) -> None:
def set_pipeline_embedded_vars(vars_storage: PipelineVars, pipeline_vars: dict, source_path: str,
is_secure: bool = False, is_remote: bool = False) -> None:
for key, value in pipeline_vars.items():
if key not in vars_storage.vars_config: # to avoid overwriting CONFIG vars with ones from PIPELINE/TEMPLATE
ParamsProcessor._set_var(vars_storage, 'vars_pipeline', key, value,
Expand Down
17 changes: 12 additions & 5 deletions src/pipelines_declarative_executor/executor/stage_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pipelines_declarative_executor.utils.logging_utils import LoggingUtils
from pipelines_declarative_executor.utils.profiling_utils import ProfilingUtils
from pipelines_declarative_executor.utils.string_utils import StringUtils
from pipelines_declarative_executor.x_modules_ops.dict_utils import UtilsDictionary


class StageProcessor:
Expand Down Expand Up @@ -202,16 +203,13 @@ async def _run_nested_pipeline(execution: PipelineExecution, stage: Stage):
for key, value in execution.vars.vars_retry.items():
ParamsProcessor.set_retry_var(nested_execution.vars, key, value)
else:
nested_execution = PipelineOrchestrator.prepare_pipeline_execution(
input_calculated.get('pipeline_data'),
input_calculated.get('pipeline_vars')
)
nested_execution = PipelineOrchestrator.prepare_pipeline_execution(**StageProcessor._extract_nested_params(input_calculated))

from pipelines_declarative_executor.executor.pipeline_executor import PipelineExecutor
await PipelineExecutor.start(
nested_execution,
execution_folder_path=stage.exec_dir,
is_dry_run=execution.is_dry_run or StringUtils.to_bool(input_calculated.get('is_dry_run')),
is_dry_run=execution.is_dry_run or StringUtils.to_bool(UtilsDictionary.get_by_path(input_calculated, "params.params.IS_DRY_RUN")),
wait_for_finish=True,
is_nested=True,
)
Expand All @@ -220,3 +218,12 @@ async def _run_nested_pipeline(execution: PipelineExecution, stage: Stage):
except asyncio.CancelledError:
execution.logger.warning("Nested pipeline execution cancelled!")
raise

@staticmethod
def _extract_nested_params(params: dict) -> dict:
return {
'pipeline_data': (UtilsDictionary.get_by_path(params, "params.params.PIPELINE_DATA")
or UtilsDictionary.get_by_path(params, "params_secure.params.PIPELINE_DATA")),
'pipeline_vars': UtilsDictionary.get_by_path(params, "params.params.PIPELINE_VARS"),
'pipeline_vars_secure': UtilsDictionary.get_by_path(params, "params_secure.params.PIPELINE_VARS"),
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

class PipelineOrchestrator:
@staticmethod
def prepare_pipeline_execution(pipeline_data: str, pipeline_vars: str = None) -> PipelineExecution:
pipeline_execution = PipelineExecution(inputs={"pipeline_data": pipeline_data, "pipeline_vars": pipeline_vars})
def prepare_pipeline_execution(pipeline_data: str, pipeline_vars: str = None, pipeline_vars_secure: str = None) -> PipelineExecution:
pipeline_execution = PipelineExecution(inputs={
"pipeline_data": pipeline_data,
"pipeline_vars": pipeline_vars,
"pipeline_vars_secure": pipeline_vars_secure,
})
vars_obj = PipelineVars()
merged_template = PipelineTemplate()
last_pipeline = None
Expand Down Expand Up @@ -46,10 +50,11 @@ def prepare_pipeline_execution(pipeline_data: str, pipeline_vars: str = None) ->
raise Exception("No 'AtlasPipeline' present in 'pipeline_data'!")

if pipeline_embedded_vars := last_pipeline.data.get('pipeline', {}).get('vars'):
ParamsProcessor.set_pipeline_vars(vars_obj, pipeline_embedded_vars, last_pipeline.file_path, last_pipeline.is_secure, last_pipeline.is_remote)

ParamsProcessor.set_pipeline_embedded_vars(vars_obj, pipeline_embedded_vars, last_pipeline.file_path, last_pipeline.is_secure, last_pipeline.is_remote)
if pipeline_vars:
PipelineOrchestrator._process_pipeline_vars(vars_obj, pipeline_vars)
PipelineOrchestrator._process_pipeline_vars(vars_obj, pipeline_vars, is_secure=False)
if pipeline_vars_secure:
PipelineOrchestrator._process_pipeline_vars(vars_obj, pipeline_vars_secure, is_secure=True)

PipelineOrchestrator._process_global_configs(vars_obj)

Expand All @@ -69,19 +74,19 @@ def _process_atlas_config(vars_obj: PipelineVars, config: AtlasMetaFile):
@staticmethod
def _process_pipeline_template(merged_template: PipelineTemplate, vars_obj: PipelineVars, template: AtlasMetaFile):
if template_vars := template.data.get('pipeline', {}).get('vars'):
ParamsProcessor.set_pipeline_vars(vars_obj, template_vars, template.file_path, template.is_secure, template.is_remote)
ParamsProcessor.set_pipeline_embedded_vars(vars_obj, template_vars, template.file_path, template.is_secure, template.is_remote)
if template_config := template.data.get('pipeline', {}).get('configuration'):
merged_template.configuration.update(template_config)
if template_jobs := template.data.get('pipeline', {}).get('jobs'):
merged_template.job_templates.update(template_jobs)

@staticmethod
def _process_pipeline_vars(vars_obj: PipelineVars, pipeline_vars: str):
def _process_pipeline_vars(vars_obj: PipelineVars, pipeline_vars: str, is_secure: bool = False):
vars_list = StringUtils.trim_lines(pipeline_vars)
for var in vars_list:
if '=' in var:
key, value = var.split('=', 1)
ParamsProcessor.set_override_var(vars_obj, key.strip(), value.strip())
ParamsProcessor.set_override_var(vars_obj, key.strip(), value.strip(), is_secure)

@staticmethod
def _process_global_configs(vars_obj: PipelineVars):
Expand Down
Loading
Loading