diff --git a/build_stream/api/router.py b/build_stream/api/router.py index d7f2ef47d0..c1d25fa4ab 100644 --- a/build_stream/api/router.py +++ b/build_stream/api/router.py @@ -23,6 +23,7 @@ from api.generate_input_files.routes import router as generate_input_files_router from api.local_repo.routes import router as local_repo_router from api.build_image.routes import router as build_image_router +from api.validate.routes import router as validate_router api_router = APIRouter(prefix="/api/v1") @@ -33,3 +34,4 @@ api_router.include_router(generate_input_files_router) api_router.include_router(local_repo_router) api_router.include_router(build_image_router) +api_router.include_router(validate_router) diff --git a/build_stream/api/validate/__init__.py b/build_stream/api/validate/__init__.py new file mode 100644 index 0000000000..bd3868ecb6 --- /dev/null +++ b/build_stream/api/validate/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest API module.""" + +from api.validate.routes import router + +__all__ = ["router"] diff --git a/build_stream/api/validate/dependencies.py b/build_stream/api/validate/dependencies.py new file mode 100644 index 0000000000..e10935109f --- /dev/null +++ b/build_stream/api/validate/dependencies.py @@ -0,0 +1,77 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""FastAPI dependency providers for ValidateImageOnTest API.""" + +from typing import Optional + +from fastapi import Header, HTTPException, status + +from core.jobs.value_objects import ClientId, CorrelationId + + +def _get_container(): + """Lazy import of container to avoid circular imports.""" + from container import container # pylint: disable=import-outside-toplevel + return container + + +def get_validate_image_on_test_use_case(): + """Provide validate-image-on-test use case.""" + return _get_container().validate_image_on_test_use_case() + + +def get_validate_client_id( + authorization: str = Header(..., description="Bearer token for authentication"), +) -> ClientId: + """Extract ClientId from Bearer token header.""" + if not authorization.startswith("Bearer "): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authorization header format", + ) + + token = authorization[7:].lstrip() + if not token: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Missing authentication token", + ) + + try: + return ClientId(token[:128] if len(token) > 128 else token) + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid client credentials", + ) from exc + + +def get_validate_correlation_id( + x_correlation_id: Optional[str] = Header( + default=None, + alias="X-Correlation-Id", + description="Request tracing ID", + ), +) -> CorrelationId: + """Return provided correlation ID or generate one.""" + generator = _get_container().uuid_generator() + if x_correlation_id: + try: + return CorrelationId(x_correlation_id) + except ValueError: + pass + + generated_id = generator.generate() + return CorrelationId(str(generated_id)) diff --git a/build_stream/api/validate/routes.py b/build_stream/api/validate/routes.py new file mode 100644 index 0000000000..4700750fb6 --- /dev/null +++ b/build_stream/api/validate/routes.py @@ -0,0 +1,208 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""FastAPI routes for validate-image-on-test stage operations.""" + +import logging +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, HTTPException, status + +from api.validate.dependencies import ( + get_validate_image_on_test_use_case, + get_validate_client_id, + get_validate_correlation_id, +) +from api.dependencies import verify_token, require_job_write +from api.validate.schemas import ( + ValidateImageOnTestResponse, + ValidateImageOnTestErrorResponse, +) +from api.logging_utils import log_secure_info +from core.jobs.exceptions import ( + InvalidStateTransitionError, + JobNotFoundError, +) +from core.jobs.value_objects import ClientId, CorrelationId, JobId +from core.validate.exceptions import ( + StageGuardViolationError, + ValidateDomainError, + ValidationExecutionError, +) +from orchestrator.validate.commands import ValidateImageOnTestCommand +from orchestrator.validate.use_cases import ValidateImageOnTestUseCase + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/jobs", tags=["Validate Image On Test"]) + + +def _build_error_response( + error_code: str, + message: str, + correlation_id: str, +) -> ValidateImageOnTestErrorResponse: + return ValidateImageOnTestErrorResponse( + error=error_code, + message=message, + correlation_id=correlation_id, + timestamp=datetime.now(timezone.utc).isoformat() + "Z", + ) + + +@router.post( + "/{job_id}/stages/validate-image-on-test", + response_model=ValidateImageOnTestResponse, + status_code=status.HTTP_202_ACCEPTED, + summary="Validate image on test environment", + description="Trigger the validate-image-on-test stage for a job", + responses={ + 202: {"description": "Stage accepted", "model": ValidateImageOnTestResponse}, + 400: {"description": "Invalid request", "model": ValidateImageOnTestErrorResponse}, + 401: {"description": "Unauthorized", "model": ValidateImageOnTestErrorResponse}, + 404: {"description": "Job not found", "model": ValidateImageOnTestErrorResponse}, + 409: {"description": "Stage conflict", "model": ValidateImageOnTestErrorResponse}, + 412: {"description": "Stage guard violation", "model": ValidateImageOnTestErrorResponse}, + 500: {"description": "Internal error", "model": ValidateImageOnTestErrorResponse}, + }, +) +def create_validate_image_on_test( + job_id: str, + token_data: dict = Depends(verify_token), + use_case: ValidateImageOnTestUseCase = Depends(get_validate_image_on_test_use_case), + client_id: ClientId = Depends(get_validate_client_id), + correlation_id: CorrelationId = Depends(get_validate_correlation_id), + _: None = Depends(require_job_write), +) -> ValidateImageOnTestResponse: + """Trigger the validate-image-on-test stage for a job. + + Accepts the request synchronously and returns 202 Accepted. + The playbook execution is handled by the NFS queue watcher service. + """ + logger.info( + "Validate image on test request: job_id=%s, client_id=%s, correlation_id=%s", + job_id, + client_id.value, + correlation_id.value, + ) + + try: + validated_job_id = JobId(job_id) + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=_build_error_response( + "INVALID_JOB_ID", + f"Invalid job_id format: {job_id}", + correlation_id.value, + ).model_dump(), + ) from exc + + try: + command = ValidateImageOnTestCommand( + job_id=validated_job_id, + client_id=client_id, + correlation_id=correlation_id, + ) + result = use_case.execute(command) + + return ValidateImageOnTestResponse( + job_id=result.job_id, + stage=result.stage_name, + status=result.status, + submitted_at=result.submitted_at, + correlation_id=result.correlation_id, + ) + + except JobNotFoundError as exc: + logger.warning("Job not found: %s", job_id) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=_build_error_response( + "JOB_NOT_FOUND", + exc.message, + correlation_id.value, + ).model_dump(), + ) from exc + + except InvalidStateTransitionError as exc: + log_secure_info( + "warning", + f"Invalid state transition for job {job_id}", + str(correlation_id.value), + ) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=_build_error_response( + "INVALID_STATE_TRANSITION", + exc.message, + correlation_id.value, + ).model_dump(), + ) from exc + + except StageGuardViolationError as exc: + log_secure_info( + "warning", + f"Stage guard violation for job {job_id}", + str(correlation_id.value), + ) + raise HTTPException( + status_code=status.HTTP_412_PRECONDITION_FAILED, + detail=_build_error_response( + "STAGE_GUARD_VIOLATION", + exc.message, + correlation_id.value, + ).model_dump(), + ) from exc + + except ValidationExecutionError as exc: + log_secure_info( + "error", + f"Validation execution error for job {job_id}", + str(correlation_id.value), + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=_build_error_response( + "VALIDATION_EXECUTION_ERROR", + exc.message, + correlation_id.value, + ).model_dump(), + ) from exc + + except ValidateDomainError as exc: + log_secure_info( + "error", + f"Validate domain error for job {job_id}", + str(correlation_id.value), + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=_build_error_response( + "VALIDATE_ERROR", + exc.message, + correlation_id.value, + ).model_dump(), + ) from exc + + except Exception as exc: + logger.exception("Unexpected error creating validate-image-on-test stage") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=_build_error_response( + "INTERNAL_ERROR", + "An unexpected error occurred", + correlation_id.value, + ).model_dump(), + ) from exc diff --git a/build_stream/api/validate/schemas.py b/build_stream/api/validate/schemas.py new file mode 100644 index 0000000000..141a295fb6 --- /dev/null +++ b/build_stream/api/validate/schemas.py @@ -0,0 +1,36 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pydantic schemas for ValidateImageOnTest API requests and responses.""" + +from pydantic import BaseModel, Field + + +class ValidateImageOnTestResponse(BaseModel): + """Response model for validate-image-on-test stage acceptance (202 Accepted).""" + + job_id: str = Field(..., description="Job identifier") + stage: str = Field(..., description="Stage identifier") + status: str = Field(..., description="Acceptance status") + submitted_at: str = Field(..., description="Submission timestamp (ISO 8601)") + correlation_id: str = Field(..., description="Correlation identifier") + + +class ValidateImageOnTestErrorResponse(BaseModel): + """Standard error response body for validate-image-on-test operations.""" + + error: str = Field(..., description="Error code") + message: str = Field(..., description="Error message") + correlation_id: str = Field(..., description="Request correlation ID") + timestamp: str = Field(..., description="Error timestamp (ISO 8601)") diff --git a/build_stream/container.py b/build_stream/container.py index 7e9674cd33..5cede75fdd 100644 --- a/build_stream/container.py +++ b/build_stream/container.py @@ -47,8 +47,9 @@ from orchestrator.catalog.use_cases.parse_catalog import ParseCatalogUseCase from orchestrator.jobs.use_cases import CreateJobUseCase from orchestrator.local_repo.use_cases import CreateLocalRepoUseCase -from orchestrator.local_repo.result_poller import LocalRepoResultPoller +from orchestrator.common.result_poller import ResultPoller from orchestrator.build_image.use_cases import CreateBuildImageUseCase +from orchestrator.validate.use_cases import ValidateImageOnTestUseCase from core.localrepo.services import ( InputFileService, @@ -58,6 +59,7 @@ from core.build_image.services import ( BuildImageConfigService, ) +from core.validate.services import ValidateQueueService from core.catalog.adapter_policy import _DEFAULT_POLICY_PATH, _DEFAULT_SCHEMA_PATH from core.artifacts.value_objects import SafePath from common.config import load_config @@ -120,6 +122,8 @@ class DevContainer(containers.DeclarativeContainer): # pylint: disable=R0903 "api.local_repo.dependencies", "api.build_image.routes", "api.build_image.dependencies", + "api.validate.routes", + "api.validate.dependencies", "api.parse_catalog.routes", "api.parse_catalog.dependencies", ] @@ -181,9 +185,15 @@ class DevContainer(containers.DeclarativeContainer): # pylint: disable=R0903 result_repo=playbook_queue_result_repository, ) + # --- Validate services --- + validate_queue_service = providers.Factory( + ValidateQueueService, + queue_repo=playbook_queue_request_repository, + ) + # --- Result poller --- result_poller = providers.Singleton( - LocalRepoResultPoller, + ResultPoller, result_service=playbook_queue_result_service, stage_repo=stage_repository, audit_repo=audit_repository, @@ -239,7 +249,7 @@ class DevContainer(containers.DeclarativeContainer): # pylint: disable=R0903 default_policy_path=default_policy_path, policy_schema_path=policy_schema_path, ) - + create_build_image_use_case = providers.Factory( CreateBuildImageUseCase, job_repo=job_repository, @@ -251,6 +261,15 @@ class DevContainer(containers.DeclarativeContainer): # pylint: disable=R0903 uuid_generator=uuid_generator, ) + validate_image_on_test_use_case = providers.Factory( + ValidateImageOnTestUseCase, + job_repo=job_repository, + stage_repo=stage_repository, + audit_repo=audit_repository, + queue_service=validate_queue_service, + uuid_generator=uuid_generator, + ) + class ProdContainer(containers.DeclarativeContainer): # pylint: disable=R0903 """Production profile container. @@ -269,6 +288,8 @@ class ProdContainer(containers.DeclarativeContainer): # pylint: disable=R0903 "api.local_repo.dependencies", "api.build_image.routes", "api.build_image.dependencies", + "api.validate.routes", + "api.validate.dependencies", "api.parse_catalog.routes", "api.parse_catalog.dependencies", ] @@ -335,10 +356,15 @@ class ProdContainer(containers.DeclarativeContainer): # pylint: disable=R0903 config_repo=input_repository, ) + # --- Validate services --- + validate_queue_service = providers.Factory( + ValidateQueueService, + queue_repo=playbook_queue_request_repository, + ) # --- Result poller --- result_poller = providers.Singleton( - LocalRepoResultPoller, + ResultPoller, result_service=playbook_queue_result_service, stage_repo=stage_repository, audit_repo=audit_repository, @@ -394,6 +420,14 @@ class ProdContainer(containers.DeclarativeContainer): # pylint: disable=R0903 uuid_generator=uuid_generator, ) + validate_image_on_test_use_case = providers.Factory( + ValidateImageOnTestUseCase, + job_repo=job_repository, + stage_repo=stage_repository, + audit_repo=audit_repository, + queue_service=validate_queue_service, + uuid_generator=uuid_generator, + ) generate_input_files_use_case = providers.Factory( GenerateInputFilesUseCase, diff --git a/build_stream/core/validate/__init__.py b/build_stream/core/validate/__init__.py new file mode 100644 index 0000000000..161fe85b15 --- /dev/null +++ b/build_stream/core/validate/__init__.py @@ -0,0 +1,32 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest domain module. + +This module contains domain logic for validate-image-on-test operations. +""" + +from core.validate.entities import ValidateImageOnTestRequest +from core.validate.exceptions import ( + ValidateDomainError, + EnvironmentUnavailableError, + ValidationExecutionError, +) + +__all__ = [ + "ValidateImageOnTestRequest", + "ValidateDomainError", + "EnvironmentUnavailableError", + "ValidationExecutionError", +] diff --git a/build_stream/core/validate/entities.py b/build_stream/core/validate/entities.py new file mode 100644 index 0000000000..72dfee0493 --- /dev/null +++ b/build_stream/core/validate/entities.py @@ -0,0 +1,71 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Domain entities for ValidateImageOnTest module.""" + +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Dict + +from core.localrepo.value_objects import ExecutionTimeout, ExtraVars, PlaybookPath + + +@dataclass(frozen=True) +class ValidateImageOnTestRequest: + """Immutable entity representing a validate-image-on-test request. + + Written to the NFS queue for OIM Core consumption. + Compatible with PlaybookRequest interface for reuse of existing repository. + + Attributes: + job_id: Parent job identifier. + stage_name: Stage identifier (validate-image-on-test). + playbook_path: Validated path to the discovery playbook. + extra_vars: Ansible extra variables (includes job_id). + correlation_id: Request tracing identifier. + timeout: Execution timeout configuration. + submitted_at: Request submission timestamp. + request_id: Unique request identifier. + """ + + job_id: str + stage_name: str + playbook_path: PlaybookPath + extra_vars: ExtraVars + correlation_id: str + timeout: ExecutionTimeout + submitted_at: str + request_id: str + + def to_dict(self) -> Dict[str, Any]: + """Serialize request to dictionary for JSON file writing.""" + return { + "job_id": self.job_id, + "stage_name": self.stage_name, + "playbook_path": str(self.playbook_path), + "extra_vars": self.extra_vars.to_dict(), + "correlation_id": self.correlation_id, + "timeout_minutes": self.timeout.minutes, + "submitted_at": self.submitted_at, + "request_id": self.request_id, + } + + def generate_filename(self) -> str: + """Generate request file name following naming convention. + + Returns: + Filename: {job_id}_{stage_name}_{timestamp}.json + """ + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + return f"{self.job_id}_{self.stage_name}_{timestamp}.json" diff --git a/build_stream/core/validate/exceptions.py b/build_stream/core/validate/exceptions.py new file mode 100644 index 0000000000..06a0879783 --- /dev/null +++ b/build_stream/core/validate/exceptions.py @@ -0,0 +1,42 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest domain exceptions.""" + + +class ValidateDomainError(Exception): + """Base exception for validate-image-on-test domain errors.""" + + def __init__(self, message: str, correlation_id: str = ""): + """Initialize domain error. + + Args: + message: Error message. + correlation_id: Request correlation ID for tracing. + """ + super().__init__(message) + self.message = message + self.correlation_id = correlation_id + + +class EnvironmentUnavailableError(ValidateDomainError): + """Raised when test environment is not available for validation.""" + + +class ValidationExecutionError(ValidateDomainError): + """Raised when validation playbook execution fails.""" + + +class StageGuardViolationError(ValidateDomainError): + """Raised when required upstream stage has not completed.""" diff --git a/build_stream/core/validate/services.py b/build_stream/core/validate/services.py new file mode 100644 index 0000000000..e1cd85573b --- /dev/null +++ b/build_stream/core/validate/services.py @@ -0,0 +1,63 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Domain services for ValidateImageOnTest module.""" + +import logging + +from core.jobs.value_objects import CorrelationId +from core.validate.entities import ValidateImageOnTestRequest + +logger = logging.getLogger(__name__) + + +class ValidateQueueService: + """Service for validate-image-on-test queue operations.""" + + def __init__(self, queue_repo) -> None: + """Initialize service with PlaybookQueueRequestRepository. + + Args: + queue_repo: Playbook queue request repository implementation. + """ + self._queue_repo = queue_repo + + def submit_request( + self, + request: ValidateImageOnTestRequest, + correlation_id: CorrelationId, + ) -> None: + """Submit validate-image-on-test request to queue. + + Args: + request: ValidateImageOnTestRequest to submit. + correlation_id: Correlation ID for tracing. + + Raises: + QueueUnavailableError: If queue is not accessible. + """ + logger.info( + "Submitting validate-image-on-test request to queue: " + "job_id=%s, correlation_id=%s", + request.job_id, + correlation_id, + ) + self._queue_repo.write_request(request) + logger.info( + "Validate-image-on-test request submitted successfully: " + "job_id=%s, request_id=%s, correlation_id=%s", + request.job_id, + request.request_id, + correlation_id, + ) diff --git a/build_stream/main.py b/build_stream/main.py index a90acda848..5b2e5bac1c 100644 --- a/build_stream/main.py +++ b/build_stream/main.py @@ -44,6 +44,8 @@ "api.jobs.dependencies", "api.local_repo.routes", "api.local_repo.dependencies", + "api.validate.routes", + "api.validate.dependencies", ]) logger.info("Using container: %s", container.__class__.__name__) @@ -58,9 +60,9 @@ async def lifespan(app: FastAPI): result_poller = container.result_poller() await result_poller.start() logger.info("Application startup complete") - + yield - + # Shutdown: Stop the result poller await result_poller.stop() logger.info("Application shutdown complete") diff --git a/build_stream/orchestrator/common/__init__.py b/build_stream/orchestrator/common/__init__.py new file mode 100644 index 0000000000..2fe7d88f4e --- /dev/null +++ b/build_stream/orchestrator/common/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Common orchestrator components shared across stages.""" + +from orchestrator.common.result_poller import ResultPoller + +__all__ = ["ResultPoller"] diff --git a/build_stream/orchestrator/common/result_poller.py b/build_stream/orchestrator/common/result_poller.py new file mode 100644 index 0000000000..fae7870790 --- /dev/null +++ b/build_stream/orchestrator/common/result_poller.py @@ -0,0 +1,190 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Common result poller for processing playbook execution results from NFS queue. + +This module provides a shared ResultPoller that can be used by all stage APIs +(local_repo, build_image, validate_image_on_test, etc.) to poll the NFS result +queue and update stage states accordingly. +""" + +import asyncio +import logging +from datetime import datetime, timezone + +from api.logging_utils import log_secure_info + +from core.jobs.entities import AuditEvent +from core.jobs.repositories import ( + AuditEventRepository, + StageRepository, + UUIDGenerator, +) +from core.jobs.value_objects import StageName +from core.localrepo.entities import PlaybookResult +from core.localrepo.services import PlaybookQueueResultService + +logger = logging.getLogger(__name__) + + +class ResultPoller: + """Common poller for processing playbook execution results. + + This poller monitors the NFS result queue and processes results + by updating stage states and emitting audit events. It handles + results from all stage types (local_repo, build_image, + validate_image_on_test, etc.). + + Attributes: + result_service: Service for polling NFS result queue. + stage_repo: Stage repository for updating stage states. + audit_repo: Audit event repository for emitting events. + uuid_generator: UUID generator for event IDs. + poll_interval: Interval in seconds between polls. + running: Flag indicating if poller is running. + """ + + def __init__( + self, + result_service: PlaybookQueueResultService, + stage_repo: StageRepository, + audit_repo: AuditEventRepository, + uuid_generator: UUIDGenerator, + poll_interval: int = 5, + ) -> None: # pylint: disable=too-many-arguments,too-many-positional-arguments + """Initialize result poller. + + Args: + result_service: Service for polling NFS result queue. + stage_repo: Stage repository implementation. + audit_repo: Audit event repository implementation. + uuid_generator: UUID generator for identifiers. + poll_interval: Interval in seconds between polls (default: 5). + """ + self._result_service = result_service + self._stage_repo = stage_repo + self._audit_repo = audit_repo + self._uuid_generator = uuid_generator + self._poll_interval = poll_interval + self._running = False + self._task = None + + async def start(self) -> None: + """Start the result poller.""" + if self._running: + logger.warning("Result poller is already running") + return + + self._running = True + self._task = asyncio.create_task(self._poll_loop()) + logger.info("Result poller started with interval=%ds", self._poll_interval) + + async def stop(self) -> None: + """Stop the result poller.""" + if not self._running: + return + + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.info("Result poller stopped") + + async def _poll_loop(self) -> None: + """Main polling loop.""" + while self._running: + try: + processed_count = self._result_service.poll_results( + callback=self._on_result_received + ) + if processed_count > 0: + logger.info("Processed %d playbook results", processed_count) + except Exception as exc: # pylint: disable=broad-except + logger.exception("Error polling results: %s", exc) + + await asyncio.sleep(self._poll_interval) + + def _on_result_received(self, result: PlaybookResult) -> None: + """Handle received playbook result. + + Args: + result: Playbook execution result from NFS queue. + """ + try: + # Find stage + stage_name = StageName(result.stage_name) + stage = self._stage_repo.find_by_job_and_name(result.job_id, stage_name) + + if stage is None: + logger.error( + "Stage not found for result: job_id=%s, stage=%s", + result.job_id, + result.stage_name, + ) + return + + # Update stage based on result + if result.status == "success": + stage.complete() + logger.info( + "Stage completed successfully: job_id=%s, stage=%s", + result.job_id, + result.stage_name, + ) + else: + error_code = result.error_code or "PLAYBOOK_FAILED" + error_summary = result.error_summary or "Playbook execution failed" + stage.fail(error_code=error_code, error_summary=error_summary) + logger.warning( + "Stage failed: job_id=%s, stage=%s, error=%s", + result.job_id, + result.stage_name, + error_code, + ) + + # Save updated stage + self._stage_repo.save(stage) + + # Emit audit event + event = AuditEvent( + event_id=str(self._uuid_generator.generate()), + job_id=result.job_id, + event_type="STAGE_COMPLETED" if result.status == "success" else "STAGE_FAILED", + correlation_id=result.request_id, + client_id=result.job_id, # Using job_id as client_id placeholder + timestamp=datetime.now(timezone.utc), + details={ + "stage_name": result.stage_name, + "status": result.status, + "duration_seconds": result.duration_seconds, + "exit_code": result.exit_code, + }, + ) + self._audit_repo.save(event) + + log_secure_info( + "info", + f"Result processed for job {result.job_id}, stage {result.stage_name}", + result.request_id, + ) + + except Exception as exc: # pylint: disable=broad-except + logger.exception( + "Error handling result: job_id=%s, error=%s", + result.job_id, + exc, + ) diff --git a/build_stream/orchestrator/local_repo/result_poller.py b/build_stream/orchestrator/local_repo/result_poller.py index 003d468b89..cf78a5be11 100644 --- a/build_stream/orchestrator/local_repo/result_poller.py +++ b/build_stream/orchestrator/local_repo/result_poller.py @@ -12,194 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Result poller for processing playbook execution results from NFS queue.""" +"""Backward-compatible alias for the common ResultPoller. -import asyncio -import logging -from datetime import datetime, timezone +The result poller has been promoted to orchestrator.common.result_poller +so that all stage APIs (local_repo, build_image, validate_image_on_test) +share a single poller instance. This module re-exports the class under +its original name for backward compatibility. +""" -from api.logging_utils import log_secure_info +from orchestrator.common.result_poller import ResultPoller -from core.jobs.entities import AuditEvent -from core.jobs.repositories import ( - AuditEventRepository, - StageRepository, - UUIDGenerator, -) -from core.jobs.value_objects import StageName -from core.localrepo.entities import PlaybookResult -from core.localrepo.services import PlaybookQueueResultService +# Backward-compatible alias +LocalRepoResultPoller = ResultPoller -logger = logging.getLogger(__name__) - - -class LocalRepoResultPoller: - """Poller for processing playbook execution results. - - This poller monitors the NFS result queue and processes results - by updating stage states and emitting audit events. - - Attributes: - result_service: Service for polling NFS result queue. - stage_repo: Stage repository for updating stage states. - audit_repo: Audit event repository for emitting events. - uuid_generator: UUID generator for event IDs. - poll_interval: Interval in seconds between polls. - running: Flag indicating if poller is running. - """ - - def __init__( - self, - result_service: PlaybookQueueResultService, - stage_repo: StageRepository, - audit_repo: AuditEventRepository, - uuid_generator: UUIDGenerator, - poll_interval: int = 5, - ) -> None: # pylint: disable=too-many-arguments,too-many-positional-arguments - """Initialize result poller. - - Args: - result_service: Service for polling NFS result queue. - stage_repo: Stage repository implementation. - audit_repo: Audit event repository implementation. - uuid_generator: UUID generator for identifiers. - poll_interval: Interval in seconds between polls (default: 5). - """ - self._result_service = result_service - self._stage_repo = stage_repo - self._audit_repo = audit_repo - self._uuid_generator = uuid_generator - self._poll_interval = poll_interval - self._running = False - self._task = None - - async def start(self) -> None: - """Start the result poller.""" - if self._running: - logger.warning("Result poller is already running") - return - - self._running = True - self._task = asyncio.create_task(self._poll_loop()) - logger.info("Result poller started with interval=%ds", self._poll_interval) - - async def stop(self) -> None: - """Stop the result poller.""" - if not self._running: - return - - self._running = False - if self._task: - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - logger.info("Result poller stopped") - - async def _poll_loop(self) -> None: - """Main polling loop.""" - while self._running: - try: - processed_count = self._result_service.poll_results( - callback=self._on_result_received - ) - if processed_count > 0: - logger.info("Processed %d playbook results", processed_count) - except Exception as exc: # pylint: disable=broad-except - logger.exception("Error polling results: %s", exc) - - await asyncio.sleep(self._poll_interval) - - def _on_result_received(self, result: PlaybookResult) -> None: - """Handle received playbook result. - - Args: - result: Playbook execution result from NFS queue. - """ - # Import here to avoid circular imports - from infra.db.session import get_db_session - from infra.db.repositories import SqlStageRepository, SqlAuditEventRepository - - try: - # Use a fresh session for this operation to ensure proper transaction management - with get_db_session() as session: - # Create repositories with the same session - stage_repo = SqlStageRepository(session=session) - audit_repo = SqlAuditEventRepository(session=session) - - # Find stage - stage_name = StageName(result.stage_name) - stage = stage_repo.find_by_job_and_name(result.job_id, stage_name) - - if stage is None: - logger.error( - "Stage not found for result: job_id=%s, stage=%s", - result.job_id, - result.stage_name, - ) - return - - # Update stage based on result - if result.status == "success": - stage.complete() - logger.info( - "Stage completed successfully: job_id=%s, stage=%s", - result.job_id, - result.stage_name, - ) - else: - error_code = result.error_code or "PLAYBOOK_FAILED" - error_summary = result.error_summary or "Playbook execution failed" - stage.fail(error_code=error_code, error_summary=error_summary) - logger.warning( - "Stage failed: job_id=%s, stage=%s, error=%s", - result.job_id, - result.stage_name, - error_code, - ) - - # Save updated stage (will be committed when context exits) - stage_repo.save(stage) - logger.info( - "Stage state saved to database: job_id=%s, stage=%s, new_state=%s", - result.job_id, - result.stage_name, - stage.stage_state.value, - ) - - # Emit audit event - event = AuditEvent( - event_id=str(self._uuid_generator.generate()), - job_id=result.job_id, - event_type="STAGE_COMPLETED" if result.status == "success" else "STAGE_FAILED", - correlation_id=result.request_id, - client_id=result.job_id, # Using job_id as client_id placeholder - timestamp=datetime.now(timezone.utc), - details={ - "stage_name": result.stage_name, - "status": result.status, - "duration_seconds": result.duration_seconds, - "exit_code": result.exit_code, - }, - ) - audit_repo.save(event) - - # Session will be automatically committed when exiting the context - logger.info("Database transaction committed for stage update") - - log_secure_info( - "info", - f"Result processed for job {result.job_id}, stage {result.stage_name}", - result.request_id, - ) - - except Exception as exc: # pylint: disable=broad-except - logger.exception( - "Error handling result: job_id=%s, stage=%s, error=%s", - result.job_id, - result.stage_name, - exc, - ) - # Don't re-raise the exception to avoid rolling back the transaction - # The stage state has already been saved successfully +__all__ = ["LocalRepoResultPoller"] diff --git a/build_stream/orchestrator/validate/__init__.py b/build_stream/orchestrator/validate/__init__.py new file mode 100644 index 0000000000..a400f93deb --- /dev/null +++ b/build_stream/orchestrator/validate/__init__.py @@ -0,0 +1,25 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest orchestration module.""" + +from orchestrator.validate.commands import ValidateImageOnTestCommand +from orchestrator.validate.dtos import ValidateImageOnTestResponse +from orchestrator.validate.use_cases import ValidateImageOnTestUseCase + +__all__ = [ + "ValidateImageOnTestCommand", + "ValidateImageOnTestResponse", + "ValidateImageOnTestUseCase", +] diff --git a/build_stream/orchestrator/validate/commands/__init__.py b/build_stream/orchestrator/validate/commands/__init__.py new file mode 100644 index 0000000000..43ea4f61b9 --- /dev/null +++ b/build_stream/orchestrator/validate/commands/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest command DTOs.""" + +from orchestrator.validate.commands.validate_image_on_test import ValidateImageOnTestCommand + +__all__ = ["ValidateImageOnTestCommand"] diff --git a/build_stream/orchestrator/validate/commands/validate_image_on_test.py b/build_stream/orchestrator/validate/commands/validate_image_on_test.py new file mode 100644 index 0000000000..0042684ce5 --- /dev/null +++ b/build_stream/orchestrator/validate/commands/validate_image_on_test.py @@ -0,0 +1,37 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest command DTO.""" + +from dataclasses import dataclass + +from core.jobs.value_objects import ClientId, CorrelationId, JobId + + +@dataclass(frozen=True) +class ValidateImageOnTestCommand: + """Command to trigger validate-image-on-test stage. + + Immutable command object representing the intent to execute + the validate-image-on-test stage for a given job. + + Attributes: + job_id: Job identifier from URL path. + client_id: Client who owns this job (from auth). + correlation_id: Request correlation identifier for tracing. + """ + + job_id: JobId + client_id: ClientId + correlation_id: CorrelationId diff --git a/build_stream/orchestrator/validate/dtos/__init__.py b/build_stream/orchestrator/validate/dtos/__init__.py new file mode 100644 index 0000000000..f1a8076cf8 --- /dev/null +++ b/build_stream/orchestrator/validate/dtos/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest response DTOs.""" + +from orchestrator.validate.dtos.validate_image_on_test_response import ValidateImageOnTestResponse + +__all__ = ["ValidateImageOnTestResponse"] diff --git a/build_stream/orchestrator/validate/dtos/validate_image_on_test_response.py b/build_stream/orchestrator/validate/dtos/validate_image_on_test_response.py new file mode 100644 index 0000000000..fd1a1deea1 --- /dev/null +++ b/build_stream/orchestrator/validate/dtos/validate_image_on_test_response.py @@ -0,0 +1,36 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest response DTO.""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class ValidateImageOnTestResponse: + """Response DTO for validate-image-on-test stage acceptance. + + Attributes: + job_id: Job identifier. + stage_name: Stage identifier. + status: Acceptance status. + submitted_at: Submission timestamp (ISO 8601). + correlation_id: Correlation identifier. + """ + + job_id: str + stage_name: str + status: str + submitted_at: str + correlation_id: str diff --git a/build_stream/orchestrator/validate/use_cases/__init__.py b/build_stream/orchestrator/validate/use_cases/__init__.py new file mode 100644 index 0000000000..d9ba2a4300 --- /dev/null +++ b/build_stream/orchestrator/validate/use_cases/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest use cases.""" + +from orchestrator.validate.use_cases.validate_image_on_test import ValidateImageOnTestUseCase + +__all__ = ["ValidateImageOnTestUseCase"] diff --git a/build_stream/orchestrator/validate/use_cases/validate_image_on_test.py b/build_stream/orchestrator/validate/use_cases/validate_image_on_test.py new file mode 100644 index 0000000000..e11b1ee14a --- /dev/null +++ b/build_stream/orchestrator/validate/use_cases/validate_image_on_test.py @@ -0,0 +1,274 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ValidateImageOnTest use case implementation.""" + +import logging +from datetime import datetime, timezone + +from api.logging_utils import log_secure_info + +from core.jobs.entities import AuditEvent, Stage +from core.jobs.exceptions import JobNotFoundError +from core.jobs.repositories import ( + AuditEventRepository, + JobRepository, + StageRepository, + UUIDGenerator, +) +from core.jobs.value_objects import ( + StageName, + StageState, + StageType, +) +from core.localrepo.value_objects import ( + ExecutionTimeout, + ExtraVars, + PlaybookPath, +) +from core.validate.entities import ValidateImageOnTestRequest +from core.validate.exceptions import ( + StageGuardViolationError, + ValidationExecutionError, +) +from core.validate.services import ValidateQueueService + +from orchestrator.validate.commands import ValidateImageOnTestCommand +from orchestrator.validate.dtos import ValidateImageOnTestResponse + +logger = logging.getLogger(__name__) + +DISCOVERY_PLAYBOOK_NAME = "discovery.yml" +DEFAULT_TIMEOUT_MINUTES = 60 + + +class ValidateImageOnTestUseCase: + """Use case for triggering the validate-image-on-test stage. + + This use case orchestrates stage execution with the following guarantees: + - Stage guard enforcement: BuildImage stage(s) must be completed + - Job ownership verification: Client must own the job + - Audit trail: Emits STAGE_STARTED event + - NFS queue submission: Submits playbook request to NFS queue for watcher service + + Attributes: + job_repo: Job repository port. + stage_repo: Stage repository port. + audit_repo: Audit event repository port. + queue_service: Validate queue service. + uuid_generator: UUID generator for events and request IDs. + """ + + def __init__( + self, + job_repo: JobRepository, + stage_repo: StageRepository, + audit_repo: AuditEventRepository, + queue_service: ValidateQueueService, + uuid_generator: UUIDGenerator, + ) -> None: # pylint: disable=too-many-arguments,too-many-positional-arguments + """Initialize use case with repository and service dependencies. + + Args: + job_repo: Job repository implementation. + stage_repo: Stage repository implementation. + audit_repo: Audit event repository implementation. + queue_service: Validate queue service. + uuid_generator: UUID generator for identifiers. + """ + self._job_repo = job_repo + self._stage_repo = stage_repo + self._audit_repo = audit_repo + self._queue_service = queue_service + self._uuid_generator = uuid_generator + + def execute(self, command: ValidateImageOnTestCommand) -> ValidateImageOnTestResponse: + """Execute the validate-image-on-test stage. + + Args: + command: ValidateImageOnTest command with job details. + + Returns: + ValidateImageOnTestResponse DTO with acceptance details. + + Raises: + JobNotFoundError: If job does not exist or client mismatch. + StageGuardViolationError: If upstream build-image stage not completed. + ValidationExecutionError: If queue submission fails. + """ + self._validate_job(command) + stage = self._validate_stage(command) + self._enforce_stage_guard(command) + + request = self._create_request(command) + self._submit_to_queue(command, request, stage) + self._emit_stage_started_event(command) + + return self._to_response(command, request) + + def _validate_job(self, command: ValidateImageOnTestCommand) -> None: + """Validate job exists and belongs to the requesting client.""" + job = self._job_repo.find_by_id(command.job_id) + if job is None or job.tombstoned: + raise JobNotFoundError( + job_id=str(command.job_id), + correlation_id=str(command.correlation_id), + ) + + if job.client_id != command.client_id: + raise JobNotFoundError( + job_id=str(command.job_id), + correlation_id=str(command.correlation_id), + ) + + def _validate_stage(self, command: ValidateImageOnTestCommand) -> Stage: + """Validate stage exists and is in PENDING state.""" + stage_name = StageName(StageType.VALIDATE_IMAGE_ON_TEST.value) + stage = self._stage_repo.find_by_job_and_name(command.job_id, stage_name) + + if stage is None: + raise JobNotFoundError( + job_id=str(command.job_id), + correlation_id=str(command.correlation_id), + ) + + return stage + + def _enforce_stage_guard(self, command: ValidateImageOnTestCommand) -> None: + """Enforce that at least one build-image stage has completed. + + The validate-image-on-test stage requires that at least one of the + build-image stages (x86_64 or aarch64) has completed successfully. + """ + x86_stage_name = StageName(StageType.BUILD_IMAGE_X86_64.value) + aarch64_stage_name = StageName(StageType.BUILD_IMAGE_AARCH64.value) + + x86_stage = self._stage_repo.find_by_job_and_name( + command.job_id, x86_stage_name + ) + aarch64_stage = self._stage_repo.find_by_job_and_name( + command.job_id, aarch64_stage_name + ) + + x86_completed = ( + x86_stage is not None + and x86_stage.stage_state == StageState.COMPLETED + ) + aarch64_completed = ( + aarch64_stage is not None + and aarch64_stage.stage_state == StageState.COMPLETED + ) + + if not x86_completed and not aarch64_completed: + raise StageGuardViolationError( + message=( + "At least one build-image stage (build-image-x86_64 or " + "build-image-aarch64) must be COMPLETED before " + "validate-image-on-test" + ), + correlation_id=str(command.correlation_id), + ) + + def _create_request( + self, + command: ValidateImageOnTestCommand, + ) -> ValidateImageOnTestRequest: + """Create ValidateImageOnTestRequest entity.""" + playbook_path = PlaybookPath(DISCOVERY_PLAYBOOK_NAME) + + extra_vars_dict = { + "job_id": str(command.job_id), + } + extra_vars = ExtraVars(extra_vars_dict) + + return ValidateImageOnTestRequest( + job_id=str(command.job_id), + stage_name=StageType.VALIDATE_IMAGE_ON_TEST.value, + playbook_path=playbook_path, + extra_vars=extra_vars, + correlation_id=str(command.correlation_id), + timeout=ExecutionTimeout(DEFAULT_TIMEOUT_MINUTES), + submitted_at=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + request_id=str(self._uuid_generator.generate()), + ) + + def _submit_to_queue( + self, + command: ValidateImageOnTestCommand, + request: ValidateImageOnTestRequest, + stage: Stage, + ) -> None: + """Submit playbook request to NFS queue for watcher service.""" + stage.start() + self._stage_repo.save(stage) + + try: + self._queue_service.submit_request( + request=request, + correlation_id=command.correlation_id, + ) + except Exception as exc: + stage.fail( + error_code="QUEUE_SUBMISSION_FAILED", + error_summary=str(exc), + ) + self._stage_repo.save(stage) + log_secure_info( + "error", + f"Queue submission failed for job {command.job_id}", + str(command.correlation_id), + ) + raise ValidationExecutionError( + message=f"Failed to submit validation request: {exc}", + correlation_id=str(command.correlation_id), + ) from exc + + logger.info( + "Validate-image-on-test request submitted to queue for job %s, " + "correlation_id=%s", + command.job_id, + command.correlation_id, + ) + + def _emit_stage_started_event( + self, + command: ValidateImageOnTestCommand, + ) -> None: + """Emit an audit event for stage start.""" + event = AuditEvent( + event_id=str(self._uuid_generator.generate()), + job_id=command.job_id, + event_type="STAGE_STARTED", + correlation_id=command.correlation_id, + client_id=command.client_id, + timestamp=datetime.now(timezone.utc), + details={ + "stage_name": StageType.VALIDATE_IMAGE_ON_TEST.value, + }, + ) + self._audit_repo.save(event) + + def _to_response( + self, + command: ValidateImageOnTestCommand, + request: ValidateImageOnTestRequest, + ) -> ValidateImageOnTestResponse: + """Map to response DTO.""" + return ValidateImageOnTestResponse( + job_id=str(command.job_id), + stage_name=StageType.VALIDATE_IMAGE_ON_TEST.value, + status="accepted", + submitted_at=request.submitted_at, + correlation_id=str(command.correlation_id), + ) diff --git a/build_stream/tests/integration/api/validate/__init__.py b/build_stream/tests/integration/api/validate/__init__.py new file mode 100644 index 0000000000..c299535a13 --- /dev/null +++ b/build_stream/tests/integration/api/validate/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for ValidateImageOnTest API.""" diff --git a/build_stream/tests/integration/api/validate/conftest.py b/build_stream/tests/integration/api/validate/conftest.py new file mode 100644 index 0000000000..cbae6c7ba5 --- /dev/null +++ b/build_stream/tests/integration/api/validate/conftest.py @@ -0,0 +1,134 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared fixtures for ValidateImageOnTest API integration tests.""" + +import os +from pathlib import Path +from typing import Dict + +import pytest +from fastapi.testclient import TestClient +from api.dependencies import verify_token + +from main import app +from infra.id_generator import UUIDv4Generator +from core.jobs.value_objects import StageState + + +@pytest.fixture(scope="function") +def client(): + """Create test client with fresh container for each test.""" + os.environ["ENV"] = "dev" + + def mock_verify_token(): + return { + "sub": "test-client-123", + "client_id": "test-client-123", + "scopes": ["job:write", "job:read"] + } + + app.dependency_overrides[verify_token] = mock_verify_token + + test_client = TestClient(app) + + yield test_client + + # Cleanup + app.dependency_overrides.clear() + + +@pytest.fixture(name="uuid_generator") +def uuid_generator_fixture(): + """UUID generator for test fixtures.""" + return UUIDv4Generator() + + +@pytest.fixture(name="auth_headers") +def auth_headers_fixture(uuid_generator) -> Dict[str, str]: + """Standard authentication headers for testing.""" + return { + "Authorization": "Bearer test-client-123", + "X-Correlation-Id": str(uuid_generator.generate()), + "Idempotency-Key": f"test-key-{uuid_generator.generate()}", + } + + +@pytest.fixture +def unique_correlation_id(uuid_generator) -> str: + """Generate unique correlation ID for each test.""" + return str(uuid_generator.generate()) + + +@pytest.fixture +def created_job(client, auth_headers) -> str: + """Create a job and return its job_id.""" + payload = {"client_id": "test-client-123", "client_name": "test-client"} + response = client.post("/api/v1/jobs", json=payload, headers=auth_headers) + assert response.status_code == 201 + return response.json()["job_id"] + + +@pytest.fixture +def job_with_completed_build_image(client, auth_headers, created_job, monkeypatch) -> str: + """Create a job with a completed build-image stage.""" + from core.jobs.entities import Stage + from core.jobs.value_objects import JobId, StageName, StageType + + # Mock the stage repository to return a completed build-image stage + def mock_find_by_job_and_name(self, job_id, stage_name): + # Handle JobId objects or string job_id + job_id_str = str(job_id) + + if stage_name.value == StageType.BUILD_IMAGE_X86_64.value: + stage = Stage( + job_id=JobId(job_id_str), + stage_name=StageName(StageType.BUILD_IMAGE_X86_64.value), + stage_state=StageState.COMPLETED, + attempt=1 + ) + return stage + elif stage_name.value == StageType.VALIDATE_IMAGE_ON_TEST.value: + stage = Stage( + job_id=JobId(job_id_str), + stage_name=StageName(StageType.VALIDATE_IMAGE_ON_TEST.value), + stage_state=StageState.PENDING, + attempt=1 + ) + return stage + return None + + # Apply the mock + monkeypatch.setattr( + "infra.repositories.in_memory.InMemoryStageRepository.find_by_job_and_name", + mock_find_by_job_and_name + ) + + return created_job + + +@pytest.fixture +def nfs_queue_dir(tmp_path): + """Create temporary NFS queue directory structure.""" + requests_dir = tmp_path / "requests" + results_dir = tmp_path / "results" + archive_dir = tmp_path / "archive" / "results" + processing_dir = tmp_path / "processing" + + requests_dir.mkdir(parents=True) + results_dir.mkdir(parents=True) + archive_dir.mkdir(parents=True) + processing_dir.mkdir(parents=True) + + return tmp_path diff --git a/build_stream/tests/integration/api/validate/test_validate_image_on_test_api.py b/build_stream/tests/integration/api/validate/test_validate_image_on_test_api.py new file mode 100644 index 0000000000..b67f27a69c --- /dev/null +++ b/build_stream/tests/integration/api/validate/test_validate_image_on_test_api.py @@ -0,0 +1,221 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for ValidateImageOnTest API.""" + +import json +from pathlib import Path +from unittest.mock import patch + + +class TestValidateImageOnTestSuccess: + """Happy-path validate image on test tests.""" + + def test_returns_202_with_valid_request( + self, client, auth_headers, job_with_completed_build_image, nfs_queue_dir + ): + """Test successful validate image on test request.""" + with patch( + "infra.repositories.nfs_playbook_queue_request_repository" + ".NfsPlaybookQueueRequestRepository.is_available", + return_value=True, + ), patch( + "infra.repositories.nfs_playbook_queue_request_repository" + ".NfsPlaybookQueueRequestRepository.write_request", + return_value=nfs_queue_dir / "requests" / "test.json", + ): + response = client.post( + f"/api/v1/jobs/{job_with_completed_build_image}/stages/validate-image-on-test", + headers=auth_headers, + ) + + assert response.status_code == 202 + data = response.json() + assert data["job_id"] == job_with_completed_build_image + assert data["stage"] == "validate-image-on-test" + assert data["status"] == "accepted" + assert "submitted_at" in data + assert "correlation_id" in data + + def test_returns_correlation_id( + self, client, job_with_completed_build_image, unique_correlation_id, + nfs_queue_dir + ): + """Test correlation ID is returned in response.""" + headers = { + "Authorization": "Bearer test-client-123", + "X-Correlation-Id": unique_correlation_id, + } + + with patch( + "infra.repositories.nfs_playbook_queue_request_repository" + ".NfsPlaybookQueueRequestRepository.is_available", + return_value=True, + ), patch( + "infra.repositories.nfs_playbook_queue_request_repository" + ".NfsPlaybookQueueRequestRepository.write_request", + return_value=nfs_queue_dir / "requests" / "test.json", + ): + response = client.post( + f"/api/v1/jobs/{job_with_completed_build_image}/stages/validate-image-on-test", + headers=headers, + ) + + assert response.status_code == 202 + assert response.json()["correlation_id"] == unique_correlation_id + + def test_queue_submission( + self, client, auth_headers, job_with_completed_build_image, monkeypatch + ): + """Test that validate request is submitted to queue.""" + # Create a mock for the queue service that tracks submissions + mock_submissions = [] + + def mock_write_request(self, request): + mock_submissions.append(request) + return f"/mock/path/{request.job_id}_{request.stage_name}.json" + + # Apply the mock + monkeypatch.setattr( + "infra.repositories.nfs_playbook_queue_request_repository.NfsPlaybookQueueRequestRepository.write_request", + mock_write_request + ) + monkeypatch.setattr( + "infra.repositories.nfs_playbook_queue_request_repository.NfsPlaybookQueueRequestRepository.is_available", + lambda self: True + ) + + # Make the request + response = client.post( + f"/api/v1/jobs/{job_with_completed_build_image}/stages/validate-image-on-test", + headers=auth_headers, + ) + + # Verify response + assert response.status_code == 202 + + # Verify a request was submitted + assert len(mock_submissions) == 1 + submitted_request = mock_submissions[0] + + # Verify request properties + assert submitted_request.job_id == job_with_completed_build_image + assert submitted_request.stage_name == "validate-image-on-test" + assert str(submitted_request.playbook_path) == "discovery.yml" + + +class TestValidateImageOnTestValidation: + """Validation scenarios for validate image on test.""" + + def test_invalid_job_id_returns_400(self, client, auth_headers): + """Test validate image with invalid job ID format.""" + response = client.post( + "/api/v1/jobs/invalid-uuid/stages/validate-image-on-test", + headers=auth_headers, + ) + assert response.status_code == 400 + detail = response.json()["detail"] + assert detail["error"] == "INVALID_JOB_ID" + + def test_nonexistent_job_returns_404(self, client, auth_headers): + """Test validate image with non-existent job ID.""" + fake_job_id = "018f3c4c-6a2e-7b2a-9c2a-3d8d2c4b9a11" + response = client.post( + f"/api/v1/jobs/{fake_job_id}/stages/validate-image-on-test", + headers=auth_headers, + ) + assert response.status_code == 404 + detail = response.json()["detail"] + assert detail["error"] == "JOB_NOT_FOUND" + + def test_stage_guard_violation_returns_412( + self, client, auth_headers, created_job + ): + """Test validate image without completed build-image stage.""" + response = client.post( + f"/api/v1/jobs/{created_job}/stages/validate-image-on-test", + headers=auth_headers, + ) + assert response.status_code == 412 + detail = response.json()["detail"] + assert detail["error"] == "STAGE_GUARD_VIOLATION" + assert "build-image" in detail["message"] + + +class TestValidateImageOnTestAuthentication: + """Authentication header tests.""" + + def test_missing_authorization_returns_422( + self, client, job_with_completed_build_image + ): + """Test validate image without authorization header.""" + headers = { + "X-Correlation-Id": "019bf590-1234-7890-abcd-ef1234567890", + } + response = client.post( + f"/api/v1/jobs/{job_with_completed_build_image}/stages/validate-image-on-test", + headers=headers, + ) + assert response.status_code == 422 + + def test_invalid_authorization_format_returns_401( + self, client, job_with_completed_build_image + ): + """Test validate image with invalid authorization format.""" + headers = { + "Authorization": "InvalidFormat test-token", + "X-Correlation-Id": "019bf590-1234-7890-abcd-ef1234567890", + } + response = client.post( + f"/api/v1/jobs/{job_with_completed_build_image}/stages/validate-image-on-test", + headers=headers, + ) + assert response.status_code == 401 + + def test_empty_bearer_token_returns_401( + self, client, job_with_completed_build_image + ): + """Test validate image with empty bearer token.""" + headers = { + "Authorization": "Bearer ", + "X-Correlation-Id": "019bf590-1234-7890-abcd-ef1234567890", + } + response = client.post( + f"/api/v1/jobs/{job_with_completed_build_image}/stages/validate-image-on-test", + headers=headers, + ) + assert response.status_code == 401 + + +class TestValidateImageOnTestErrorHandling: + """Error handling tests.""" + + def test_queue_unavailable_returns_500( + self, client, auth_headers, job_with_completed_build_image + ): + """Test validate image when queue is unavailable.""" + with patch( + "infra.repositories.nfs_playbook_queue_request_repository" + ".NfsPlaybookQueueRequestRepository.is_available", + return_value=False, + ): + response = client.post( + f"/api/v1/jobs/{job_with_completed_build_image}/stages/validate-image-on-test", + headers=auth_headers, + ) + + assert response.status_code == 500 + detail = response.json()["detail"] + assert detail["error"] == "VALIDATION_EXECUTION_ERROR" + # The actual error message might vary, so we don't assert on it diff --git a/build_stream/tests/performance/test_local_repo_performance.py b/build_stream/tests/performance/test_local_repo_performance.py index 314c45acc1..4d863b60c7 100644 --- a/build_stream/tests/performance/test_local_repo_performance.py +++ b/build_stream/tests/performance/test_local_repo_performance.py @@ -39,15 +39,15 @@ def test_response_time_under_threshold(self, client, auth_headers, created_job, (input_dir_for_job / "test.txt").write_text("test content") with patch( - "build_stream.infra.repositories.nfs_input_directory_repository" - ".NfsInputDirectoryRepository.get_source_input_repository_path", + "infra.repositories.nfs_input_repository" + ".NfsInputRepository.get_source_input_repository_path", return_value=input_dir_for_job, ), patch( - "build_stream.infra.repositories.nfs_input_directory_repository" - ".NfsInputDirectoryRepository.get_destination_input_repository_path", + "infra.repositories.nfs_input_repository" + ".NfsInputRepository.get_destination_input_repository_path", return_value=nfs_queue_dir / "dest_input", ), patch( - "build_stream.infra.repositories.nfs_playbook_queue_request_repository" + "infra.repositories.nfs_playbook_queue_request_repository" ".NfsPlaybookQueueRequestRepository.is_available", return_value=True, ): @@ -76,15 +76,15 @@ def test_concurrent_requests_performance(self, client, auth_headers, created_job (input_dir_for_job / "test.txt").write_text("test content") with patch( - "build_stream.infra.repositories.nfs_input_directory_repository" - ".NfsInputDirectoryRepository.get_source_input_repository_path", + "infra.repositories.nfs_input_repository" + ".NfsInputRepository.get_source_input_repository_path", return_value=input_dir_for_job, ), patch( - "build_stream.infra.repositories.nfs_input_directory_repository" - ".NfsInputDirectoryRepository.get_destination_input_repository_path", + "infra.repositories.nfs_input_repository" + ".NfsInputRepository.get_destination_input_repository_path", return_value=nfs_queue_dir / "dest_input", ), patch( - "build_stream.infra.repositories.nfs_playbook_queue_request_repository" + "infra.repositories.nfs_playbook_queue_request_repository" ".NfsPlaybookQueueRequestRepository.is_available", return_value=True, ): @@ -151,15 +151,15 @@ def test_memory_usage_stable(self, client, auth_headers, created_job, nfs_queue_ (input_dir_for_job / "test.txt").write_text("test content") with patch( - "build_stream.infra.repositories.nfs_input_directory_repository" - ".NfsInputDirectoryRepository.get_source_input_repository_path", + "infra.repositories.nfs_input_repository" + ".NfsInputRepository.get_source_input_repository_path", return_value=input_dir_for_job, ), patch( - "build_stream.infra.repositories.nfs_input_directory_repository" - ".NfsInputDirectoryRepository.get_destination_input_repository_path", + "infra.repositories.nfs_input_repository" + ".NfsInputRepository.get_destination_input_repository_path", return_value=nfs_queue_dir / "dest_input", ), patch( - "build_stream.infra.repositories.nfs_playbook_queue_request_repository" + "infra.repositories.nfs_playbook_queue_request_repository" ".NfsPlaybookQueueRequestRepository.is_available", return_value=True, ): @@ -190,15 +190,15 @@ def test_large_correlation_id_handling(self, client, auth_headers, created_job, large_correlation_id = "x" * 1000 # Reduced from 10000 with patch( - "build_stream.infra.repositories.nfs_input_directory_repository" - ".NfsInputDirectoryRepository.get_source_input_repository_path", + "infra.repositories.nfs_input_repository" + ".NfsInputRepository.get_source_input_repository_path", return_value=input_dir_for_job, ), patch( - "build_stream.infra.repositories.nfs_input_directory_repository" - ".NfsInputDirectoryRepository.get_destination_input_repository_path", + "infra.repositories.nfs_input_repository" + ".NfsInputRepository.get_destination_input_repository_path", return_value=nfs_queue_dir / "dest_input", ), patch( - "build_stream.infra.repositories.nfs_playbook_queue_request_repository" + "infra.repositories.nfs_playbook_queue_request_repository" ".NfsPlaybookQueueRequestRepository.is_available", return_value=True, ): diff --git a/build_stream/tests/unit/api/validate/__init__.py b/build_stream/tests/unit/api/validate/__init__.py new file mode 100644 index 0000000000..d71c4722d1 --- /dev/null +++ b/build_stream/tests/unit/api/validate/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for validate API module.""" diff --git a/build_stream/tests/unit/api/validate/test_routes.py b/build_stream/tests/unit/api/validate/test_routes.py new file mode 100644 index 0000000000..a5dede95e5 --- /dev/null +++ b/build_stream/tests/unit/api/validate/test_routes.py @@ -0,0 +1,229 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ValidateImageOnTest API routes.""" + +import uuid + +import pytest +from fastapi import HTTPException + +from api.validate.routes import create_validate_image_on_test, _build_error_response +from api.validate.schemas import ( + ValidateImageOnTestErrorResponse, + ValidateImageOnTestResponse, +) +from core.jobs.exceptions import InvalidStateTransitionError, JobNotFoundError +from core.jobs.value_objects import ClientId, CorrelationId +from core.validate.exceptions import ( + StageGuardViolationError, + ValidationExecutionError, +) +from orchestrator.validate.commands import ValidateImageOnTestCommand +from orchestrator.validate.dtos import ValidateImageOnTestResponse as UseCaseResponse + + +def _uuid(): + return str(uuid.uuid4()) + + +class MockValidateUseCase: + """Mock use case for testing.""" + + def __init__(self, error_to_raise=None): + self.error_to_raise = error_to_raise + self.executed_commands = [] + + def execute(self, command): + self.executed_commands.append(command) + if self.error_to_raise: + raise self.error_to_raise + + return UseCaseResponse( + job_id=str(command.job_id), + stage_name="validate-image-on-test", + status="accepted", + submitted_at="2026-02-17T10:30:00Z", + correlation_id=str(command.correlation_id), + ) + + +class TestBuildErrorResponse: + """Tests for _build_error_response helper.""" + + def test_builds_correct_response(self): + response = _build_error_response("TEST_ERROR", "Test message", "corr-123") + assert response.error == "TEST_ERROR" + assert response.message == "Test message" + assert response.correlation_id == "corr-123" + assert "Z" in response.timestamp + + +class TestCreateValidateImageOnTest: + """Tests for create_validate_image_on_test route handler.""" + + def test_success(self): + """Test successful response.""" + job_id = _uuid() + corr_id = _uuid() + use_case = MockValidateUseCase() + use_case.response = UseCaseResponse( + job_id=job_id, + stage_name="validate-image-on-test", + status="accepted", + submitted_at="2026-02-17T10:30:00Z", + correlation_id=corr_id, + ) + + response = create_validate_image_on_test( + job_id=job_id, + token_data={"client_id": "test", "scopes": ["job:write"]}, + use_case=use_case, + client_id=ClientId("test-client"), + correlation_id=CorrelationId(corr_id), + _=None, + ) + + assert response.job_id == job_id + assert response.stage == "validate-image-on-test" + assert response.status == "accepted" + assert response.correlation_id == corr_id + assert "submitted_at" in response.model_dump() + + # Verify command was created correctly + assert len(use_case.executed_commands) == 1 + command = use_case.executed_commands[0] + assert str(command.job_id) == job_id + assert str(command.client_id) == "test-client" + assert str(command.correlation_id) == corr_id + + def test_invalid_job_id(self): + """Invalid job_id should raise 400.""" + use_case = MockValidateUseCase() + corr_id = _uuid() + + with pytest.raises(HTTPException) as exc_info: + create_validate_image_on_test( + job_id="not-a-uuid", + token_data={"client_id": "test", "scopes": ["job:write"]}, + use_case=use_case, + client_id=ClientId("test-client"), + correlation_id=CorrelationId(corr_id), + _=None, + ) + assert exc_info.value.status_code == 400 + assert exc_info.value.detail["error"] == "INVALID_JOB_ID" + + def test_job_not_found(self): + """JobNotFoundError should raise 404.""" + use_case = MockValidateUseCase( + error_to_raise=JobNotFoundError(job_id=_uuid()) + ) + corr_id = _uuid() + + with pytest.raises(HTTPException) as exc_info: + create_validate_image_on_test( + job_id=_uuid(), + token_data={"client_id": "test", "scopes": ["job:write"]}, + use_case=use_case, + client_id=ClientId("test-client"), + correlation_id=CorrelationId(corr_id), + _=None, + ) + assert exc_info.value.status_code == 404 + assert exc_info.value.detail["error"] == "JOB_NOT_FOUND" + + def test_invalid_state_transition(self): + """InvalidStateTransitionError should raise 409.""" + use_case = MockValidateUseCase( + error_to_raise=InvalidStateTransitionError( + entity_type="Stage", + entity_id="test", + from_state="COMPLETED", + to_state="IN_PROGRESS", + ) + ) + corr_id = _uuid() + + with pytest.raises(HTTPException) as exc_info: + create_validate_image_on_test( + job_id=_uuid(), + token_data={"client_id": "test", "scopes": ["job:write"]}, + use_case=use_case, + client_id=ClientId("test-client"), + correlation_id=CorrelationId(corr_id), + _=None, + ) + assert exc_info.value.status_code == 409 + assert exc_info.value.detail["error"] == "INVALID_STATE_TRANSITION" + + def test_stage_guard_violation(self): + """StageGuardViolationError should raise 412.""" + use_case = MockValidateUseCase( + error_to_raise=StageGuardViolationError( + "Build stage not completed", "corr-123" + ) + ) + corr_id = _uuid() + + with pytest.raises(HTTPException) as exc_info: + create_validate_image_on_test( + job_id=_uuid(), + token_data={"client_id": "test", "scopes": ["job:write"]}, + use_case=use_case, + client_id=ClientId("test-client"), + correlation_id=CorrelationId(corr_id), + _=None, + ) + assert exc_info.value.status_code == 412 + assert exc_info.value.detail["error"] == "STAGE_GUARD_VIOLATION" + + def test_validation_execution_error(self): + """ValidationExecutionError should raise 500.""" + use_case = MockValidateUseCase( + error_to_raise=ValidationExecutionError( + "Queue failed", "corr-123" + ) + ) + corr_id = _uuid() + + with pytest.raises(HTTPException) as exc_info: + create_validate_image_on_test( + job_id=_uuid(), + token_data={"client_id": "test", "scopes": ["job:write"]}, + use_case=use_case, + client_id=ClientId("test-client"), + correlation_id=CorrelationId(corr_id), + _=None, + ) + assert exc_info.value.status_code == 500 + assert exc_info.value.detail["error"] == "VALIDATION_EXECUTION_ERROR" + + def test_unexpected_error(self): + """Unexpected errors should raise 500.""" + use_case = MockValidateUseCase( + error_to_raise=RuntimeError("unexpected") + ) + corr_id = _uuid() + + with pytest.raises(HTTPException) as exc_info: + create_validate_image_on_test( + job_id=_uuid(), + token_data={"client_id": "test", "scopes": ["job:write"]}, + use_case=use_case, + client_id=ClientId("test-client"), + correlation_id=CorrelationId(corr_id), + _=None, + ) + assert exc_info.value.status_code == 500 diff --git a/build_stream/tests/unit/core/jobs/test_value_objects.py b/build_stream/tests/unit/core/jobs/test_value_objects.py index 6ef706da40..5dc95b692b 100644 --- a/build_stream/tests/unit/core/jobs/test_value_objects.py +++ b/build_stream/tests/unit/core/jobs/test_value_objects.py @@ -155,8 +155,8 @@ def test_immutability(self): stage.value = "build-image" def test_canonical_stages_count(self): - """Verify we have exactly 9 canonical stages.""" - assert len(StageType) == 9 + """Verify we have exactly 10 canonical stages.""" + assert len(StageType) == 10 class TestIdempotencyKey: diff --git a/build_stream/tests/unit/core/validate/__init__.py b/build_stream/tests/unit/core/validate/__init__.py new file mode 100644 index 0000000000..34c1586db1 --- /dev/null +++ b/build_stream/tests/unit/core/validate/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for validate domain module.""" diff --git a/build_stream/tests/unit/core/validate/test_entities.py b/build_stream/tests/unit/core/validate/test_entities.py new file mode 100644 index 0000000000..efd608ca17 --- /dev/null +++ b/build_stream/tests/unit/core/validate/test_entities.py @@ -0,0 +1,92 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ValidateImageOnTest domain entities.""" + +import uuid +from unittest.mock import patch + +from core.localrepo.value_objects import ExecutionTimeout, ExtraVars, PlaybookPath +from core.validate.entities import ValidateImageOnTestRequest + + +def _make_request(**overrides): + """Create a ValidateImageOnTestRequest with sensible defaults.""" + defaults = { + "job_id": str(uuid.uuid4()), + "stage_name": "validate-image-on-test", + "playbook_path": PlaybookPath("discovery.yml"), + "extra_vars": ExtraVars({"job_id": str(uuid.uuid4())}), + "correlation_id": str(uuid.uuid4()), + "timeout": ExecutionTimeout(60), + "submitted_at": "2026-02-17T10:30:00Z", + "request_id": str(uuid.uuid4()), + } + defaults.update(overrides) + return ValidateImageOnTestRequest(**defaults) + + +class TestValidateImageOnTestRequest: + """Tests for ValidateImageOnTestRequest entity.""" + + def test_create_valid_request(self): + """Valid request should be created successfully.""" + request = _make_request() + assert request.stage_name == "validate-image-on-test" + assert str(request.playbook_path) == "discovery.yml" + + def test_immutability(self): + """Request should be immutable (frozen dataclass).""" + request = _make_request() + try: + request.job_id = "new-id" + assert False, "Should have raised AttributeError" + except AttributeError: + pass + + def test_to_dict(self): + """to_dict should serialize all fields correctly.""" + job_id = str(uuid.uuid4()) + corr_id = str(uuid.uuid4()) + req_id = str(uuid.uuid4()) + request = _make_request( + job_id=job_id, + correlation_id=corr_id, + request_id=req_id, + ) + result = request.to_dict() + + assert result["job_id"] == job_id + assert result["stage_name"] == "validate-image-on-test" + assert result["playbook_path"] == "discovery.yml" + assert result["correlation_id"] == corr_id + assert result["timeout_minutes"] == 60 + assert result["submitted_at"] == "2026-02-17T10:30:00Z" + assert result["request_id"] == req_id + assert isinstance(result["extra_vars"], dict) + + def test_generate_filename(self): + """generate_filename should follow naming convention.""" + job_id = "test-job-id" + request = _make_request(job_id=job_id) + + with patch("core.validate.entities.datetime") as mock_dt: + mock_dt.now.return_value.strftime.return_value = "20260217_103000" + mock_dt.now.return_value.isoformat.return_value = "2026-02-17T10:30:00+00:00" + from datetime import timezone + mock_dt.timezone = timezone + filename = request.generate_filename() + + assert filename.startswith("test-job-id_validate-image-on-test_") + assert filename.endswith(".json") diff --git a/build_stream/tests/unit/core/validate/test_exceptions.py b/build_stream/tests/unit/core/validate/test_exceptions.py new file mode 100644 index 0000000000..a8726932a2 --- /dev/null +++ b/build_stream/tests/unit/core/validate/test_exceptions.py @@ -0,0 +1,73 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ValidateImageOnTest domain exceptions.""" + +from core.validate.exceptions import ( + StageGuardViolationError, + EnvironmentUnavailableError, + ValidateDomainError, + ValidationExecutionError, +) + + +class TestValidateDomainError: + """Tests for ValidateDomainError base exception.""" + + def test_message_stored(self): + """Error message should be stored.""" + exc = ValidateDomainError("test error", "corr-123") + assert exc.message == "test error" + assert exc.correlation_id == "corr-123" + + def test_default_correlation_id(self): + """Default correlation_id should be empty string.""" + exc = ValidateDomainError("test error") + assert exc.correlation_id == "" + + def test_str_representation(self): + """String representation should be the message.""" + exc = ValidateDomainError("test error") + assert str(exc) == "test error" + + +class TestEnvironmentUnavailableError: + """Tests for EnvironmentUnavailableError.""" + + def test_inherits_from_base(self): + """Should inherit from ValidateDomainError.""" + exc = EnvironmentUnavailableError("env down", "corr-456") + assert isinstance(exc, ValidateDomainError) + assert exc.message == "env down" + assert exc.correlation_id == "corr-456" + + +class TestValidationExecutionError: + """Tests for ValidationExecutionError.""" + + def test_inherits_from_base(self): + """Should inherit from ValidateDomainError.""" + exc = ValidationExecutionError("exec failed", "corr-789") + assert isinstance(exc, ValidateDomainError) + assert exc.message == "exec failed" + + +class TestStageGuardViolationError: + """Tests for StageGuardViolationError.""" + + def test_inherits_from_base(self): + """Should inherit from ValidateDomainError.""" + exc = StageGuardViolationError("guard failed", "corr-abc") + assert isinstance(exc, ValidateDomainError) + assert exc.message == "guard failed" diff --git a/build_stream/tests/unit/core/validate/test_services.py b/build_stream/tests/unit/core/validate/test_services.py new file mode 100644 index 0000000000..415d52ea7f --- /dev/null +++ b/build_stream/tests/unit/core/validate/test_services.py @@ -0,0 +1,77 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ValidateImageOnTest domain services.""" + +import uuid + +import pytest + +from core.jobs.value_objects import CorrelationId +from core.localrepo.value_objects import ExecutionTimeout, ExtraVars, PlaybookPath +from core.validate.entities import ValidateImageOnTestRequest +from core.validate.services import ValidateQueueService + + +class MockQueueRepo: + """Mock playbook queue request repository.""" + + def __init__(self, should_fail: bool = False): + self.written_requests = [] + self.should_fail = should_fail + + def write_request(self, request): + if self.should_fail: + raise IOError("Queue unavailable") + self.written_requests.append(request) + + +def _make_request(): + """Create a ValidateImageOnTestRequest with sensible defaults.""" + return ValidateImageOnTestRequest( + job_id=str(uuid.uuid4()), + stage_name="validate-image-on-test", + playbook_path=PlaybookPath("discovery.yml"), + extra_vars=ExtraVars({"job_id": str(uuid.uuid4())}), + correlation_id=str(uuid.uuid4()), + timeout=ExecutionTimeout(60), + submitted_at="2026-02-17T10:30:00Z", + request_id=str(uuid.uuid4()), + ) + + +class TestValidateQueueService: + """Tests for ValidateQueueService.""" + + def test_submit_request_success(self): + """Successful submission should write request to repo.""" + repo = MockQueueRepo() + service = ValidateQueueService(queue_repo=repo) + request = _make_request() + corr_id = CorrelationId(str(uuid.uuid4())) + + service.submit_request(request=request, correlation_id=corr_id) + + assert len(repo.written_requests) == 1 + assert repo.written_requests[0] is request + + def test_submit_request_failure_propagates(self): + """Queue failure should propagate the exception.""" + repo = MockQueueRepo(should_fail=True) + service = ValidateQueueService(queue_repo=repo) + request = _make_request() + corr_id = CorrelationId(str(uuid.uuid4())) + + with pytest.raises(IOError, match="Queue unavailable"): + service.submit_request(request=request, correlation_id=corr_id) diff --git a/build_stream/tests/unit/orchestrator/common/__init__.py b/build_stream/tests/unit/orchestrator/common/__init__.py new file mode 100644 index 0000000000..9dc64534bd --- /dev/null +++ b/build_stream/tests/unit/orchestrator/common/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for common orchestrator module.""" diff --git a/build_stream/tests/unit/orchestrator/common/test_result_poller.py b/build_stream/tests/unit/orchestrator/common/test_result_poller.py new file mode 100644 index 0000000000..8d001197f0 --- /dev/null +++ b/build_stream/tests/unit/orchestrator/common/test_result_poller.py @@ -0,0 +1,227 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for common ResultPoller.""" + +import asyncio +import uuid + +import pytest + +from core.jobs.entities import Stage +from core.jobs.value_objects import ( + JobId, + StageName, + StageState, +) +from core.localrepo.entities import PlaybookResult +from orchestrator.common.result_poller import ResultPoller + + +# --- Mock dependencies --- + +class MockResultService: + def __init__(self): + self.callback = None + self.results_to_deliver = [] + + def poll_results(self, callback): + self.callback = callback + count = 0 + for result in self.results_to_deliver: + callback(result) + count += 1 + self.results_to_deliver = [] + return count + + +class MockStageRepo: + def __init__(self): + self._stages = {} + + def save(self, stage): + key = (str(stage.job_id), stage.stage_name.value) + self._stages[key] = stage + + def find_by_job_and_name(self, job_id, stage_name): + return self._stages.get((str(job_id), stage_name.value)) + + +class MockAuditRepo: + def __init__(self): + self._events = [] + + def save(self, event): + self._events.append(event) + + def find_by_job(self, job_id): + return [e for e in self._events if str(e.job_id) == str(job_id)] + + +class MockUUIDGenerator: + def generate(self): + return uuid.uuid4() + + +# --- Fixtures --- + +@pytest.fixture +def mock_result_service(): + return MockResultService() + + +@pytest.fixture +def mock_stage_repo(): + return MockStageRepo() + + +@pytest.fixture +def mock_audit_repo(): + return MockAuditRepo() + + +@pytest.fixture +def mock_uuid_gen(): + return MockUUIDGenerator() + + +@pytest.fixture +def result_poller(mock_result_service, mock_stage_repo, mock_audit_repo, mock_uuid_gen): + """Create ResultPoller instance with mocked dependencies.""" + return ResultPoller( + result_service=mock_result_service, + stage_repo=mock_stage_repo, + audit_repo=mock_audit_repo, + uuid_generator=mock_uuid_gen, + poll_interval=1, + ) + + +# --- Tests --- + +class TestResultPoller: + """Tests for common ResultPoller.""" + + @pytest.mark.asyncio + async def test_start_starts_polling(self, result_poller, mock_result_service): + """Poller should start and begin polling.""" + await result_poller.start() + assert result_poller._running is True + assert result_poller._task is not None + await result_poller.stop() + + @pytest.mark.asyncio + async def test_stop_stops_polling(self, result_poller): + """Poller should stop cleanly.""" + await result_poller.start() + await result_poller.stop() + assert result_poller._running is False + + @pytest.mark.asyncio + async def test_double_start_is_safe(self, result_poller): + """Starting twice should not create duplicate tasks.""" + await result_poller.start() + await result_poller.start() # Should log warning, not error + assert result_poller._running is True + await result_poller.stop() + + @pytest.mark.asyncio + async def test_stop_without_start_is_safe(self, result_poller): + """Stopping without starting should be a no-op.""" + await result_poller.stop() + assert result_poller._running is False + + def test_on_result_success( + self, result_poller, mock_stage_repo, mock_audit_repo + ): + """Successful result should complete the stage and emit audit event.""" + job_id = JobId(str(uuid.uuid4())) + stage = Stage( + job_id=job_id, + stage_name=StageName("validate-image-on-test"), + stage_state=StageState.IN_PROGRESS, + attempt=1, + ) + mock_stage_repo.save(stage) + + result = PlaybookResult( + job_id=str(job_id), + stage_name="validate-image-on-test", + request_id=str(uuid.uuid4()), + status="success", + exit_code=0, + duration_seconds=120, + ) + + result_poller._on_result_received(result) + + saved = mock_stage_repo.find_by_job_and_name( + str(job_id), StageName("validate-image-on-test") + ) + assert saved.stage_state == StageState.COMPLETED + assert len(mock_audit_repo._events) == 1 + assert mock_audit_repo._events[0].event_type == "STAGE_COMPLETED" + + def test_on_result_failure( + self, result_poller, mock_stage_repo, mock_audit_repo + ): + """Failed result should fail the stage and emit audit event.""" + job_id = JobId(str(uuid.uuid4())) + stage = Stage( + job_id=job_id, + stage_name=StageName("validate-image-on-test"), + stage_state=StageState.IN_PROGRESS, + attempt=1, + ) + mock_stage_repo.save(stage) + + result = PlaybookResult( + job_id=str(job_id), + stage_name="validate-image-on-test", + request_id=str(uuid.uuid4()), + status="failed", + exit_code=1, + error_code="PLAYBOOK_EXECUTION_FAILED", + error_summary="Playbook exited with code 1", + ) + + result_poller._on_result_received(result) + + saved = mock_stage_repo.find_by_job_and_name( + str(job_id), StageName("validate-image-on-test") + ) + assert saved.stage_state == StageState.FAILED + assert len(mock_audit_repo._events) == 1 + assert mock_audit_repo._events[0].event_type == "STAGE_FAILED" + + def test_on_result_stage_not_found( + self, result_poller, mock_stage_repo, mock_audit_repo + ): + """Missing stage should be handled gracefully (no crash).""" + result = PlaybookResult( + job_id=str(uuid.uuid4()), + stage_name="validate-image-on-test", + request_id=str(uuid.uuid4()), + status="success", + exit_code=0, + ) + + # Should not raise + result_poller._on_result_received(result) + assert len(mock_audit_repo._events) == 0 + + def test_backward_compatibility_alias(self): + """LocalRepoResultPoller should be an alias for ResultPoller.""" + from orchestrator.local_repo.result_poller import LocalRepoResultPoller + assert LocalRepoResultPoller is ResultPoller diff --git a/build_stream/tests/unit/orchestrator/jobs/use_cases/test_create_job.py b/build_stream/tests/unit/orchestrator/jobs/use_cases/test_create_job.py index f6866ebe7e..26a50c6a4a 100644 --- a/build_stream/tests/unit/orchestrator/jobs/use_cases/test_create_job.py +++ b/build_stream/tests/unit/orchestrator/jobs/use_cases/test_create_job.py @@ -154,7 +154,7 @@ def test_create_job_creates_all_stages( response = use_case.execute(command) job_id = JobId(response.job_id) stages = stage_repo.find_all_by_job(job_id) - assert len(stages) == 9 + assert len(stages) == 10 stage_names = {stage.stage_name.value for stage in stages} expected_names = {stage_type.value for stage_type in StageType} @@ -263,7 +263,7 @@ def test_idempotent_retry_returns_existing_job( assert first_response.job_id == second_response.job_id assert first_response.version == second_response.version stages = stage_repo.find_all_by_job(JobId(first_response.job_id)) - assert len(stages) == 9 + assert len(stages) == 10 events = audit_repo.find_by_job(JobId(first_response.job_id)) assert len(events) == 1 diff --git a/build_stream/tests/unit/orchestrator/validate/__init__.py b/build_stream/tests/unit/orchestrator/validate/__init__.py new file mode 100644 index 0000000000..301852a4af --- /dev/null +++ b/build_stream/tests/unit/orchestrator/validate/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for validate orchestrator module.""" diff --git a/build_stream/tests/unit/orchestrator/validate/test_validate_image_on_test_use_case.py b/build_stream/tests/unit/orchestrator/validate/test_validate_image_on_test_use_case.py new file mode 100644 index 0000000000..2c5faa9193 --- /dev/null +++ b/build_stream/tests/unit/orchestrator/validate/test_validate_image_on_test_use_case.py @@ -0,0 +1,458 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ValidateImageOnTestUseCase.""" + +import uuid + +import pytest + +from core.jobs.entities import Job, Stage +from core.jobs.exceptions import JobNotFoundError +from core.jobs.value_objects import ( + ClientId, + CorrelationId, + JobId, + JobState, + StageName, + StageState, + StageType, +) +from core.validate.exceptions import ( + StageGuardViolationError, + ValidationExecutionError, +) +from orchestrator.validate.commands import ValidateImageOnTestCommand +from orchestrator.validate.use_cases import ValidateImageOnTestUseCase + + +# --- Helpers --- + +def _uuid() -> str: + return str(uuid.uuid4()) + + +def _make_job(job_id: JobId, client_id: ClientId) -> Job: + job = Job( + job_id=job_id, + client_id=client_id, + request_client_id="req-client-123", + job_state=JobState.IN_PROGRESS, + ) + return job + + +def _make_stage( + job_id: JobId, + stage_type: StageType, + state: StageState = StageState.PENDING, +) -> Stage: + return Stage( + job_id=job_id, + stage_name=StageName(stage_type.value), + stage_state=state, + attempt=1, + ) + + +def _make_command( + job_id: JobId | None = None, + client_id: ClientId | None = None, +) -> ValidateImageOnTestCommand: + return ValidateImageOnTestCommand( + job_id=job_id or JobId(_uuid()), + client_id=client_id or ClientId("test-client"), + correlation_id=CorrelationId(_uuid()), + ) + + +# --- Mock repositories --- + +class MockJobRepo: + def __init__(self): + self._jobs = {} + + def save(self, job: Job) -> None: + self._jobs[str(job.job_id)] = job + + def find_by_id(self, job_id): + key = str(job_id) if not isinstance(job_id, str) else job_id + return self._jobs.get(key) + + def exists(self, job_id) -> bool: + key = str(job_id) if not isinstance(job_id, str) else job_id + return key in self._jobs + + +class MockStageRepo: + def __init__(self): + self._stages = {} + + def save(self, stage: Stage) -> None: + key = (str(stage.job_id), str(stage.stage_name)) + self._stages[key] = stage + + def save_all(self, stages) -> None: + for s in stages: + self.save(s) + + def find_by_job_and_name(self, job_id, stage_name): + key = (str(job_id), str(stage_name)) + return self._stages.get(key) + + def find_all_by_job(self, job_id): + jid = str(job_id) + return [s for k, s in self._stages.items() if k[0] == jid] + + +class MockAuditRepo: + def __init__(self): + self._events = [] + + def save(self, event) -> None: + self._events.append(event) + + def find_by_job(self, job_id): + jid = str(job_id) + return [e for e in self._events if str(e.job_id) == jid] + + +class MockUUIDGenerator: + def generate(self): + return uuid.uuid4() + + +class MockQueueService: + def __init__(self, should_fail: bool = False): + self.submitted = [] + self.should_fail = should_fail + + def submit_request(self, request, correlation_id): + if self.should_fail: + raise IOError("Queue unavailable") + self.submitted.append(request) + + +# --- Fixtures --- + +@pytest.fixture +def job_repo(): + return MockJobRepo() + + +@pytest.fixture +def stage_repo(): + return MockStageRepo() + + +@pytest.fixture +def audit_repo(): + return MockAuditRepo() + + +@pytest.fixture +def uuid_gen(): + return MockUUIDGenerator() + + +@pytest.fixture +def queue_service(): + return MockQueueService() + + +def _build_use_case(job_repo, stage_repo, audit_repo, queue_service, uuid_gen): + return ValidateImageOnTestUseCase( + job_repo=job_repo, + stage_repo=stage_repo, + audit_repo=audit_repo, + queue_service=queue_service, + uuid_generator=uuid_gen, + ) + + +# --- Tests --- + +class TestValidateImageOnTestUseCase: + """Tests for ValidateImageOnTestUseCase.""" + + def test_execute_success( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Successful execution should submit to queue and return response.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + + # Setup: job, validate stage, and a completed build-image stage + job = _make_job(job_id, client_id) + job_repo.save(job) + + validate_stage = _make_stage(job_id, StageType.VALIDATE_IMAGE_ON_TEST) + stage_repo.save(validate_stage) + + build_stage = _make_stage( + job_id, StageType.BUILD_IMAGE_X86_64, StageState.COMPLETED + ) + stage_repo.save(build_stage) + + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + + result = use_case.execute(command) + + assert result.job_id == str(job_id) + assert result.stage_name == "validate-image-on-test" + assert result.status == "accepted" + assert len(queue_service.submitted) == 1 + assert len(audit_repo._events) == 1 + + def test_execute_with_aarch64_completed( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Should succeed when aarch64 build stage is completed.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + + job = _make_job(job_id, client_id) + job_repo.save(job) + + validate_stage = _make_stage(job_id, StageType.VALIDATE_IMAGE_ON_TEST) + stage_repo.save(validate_stage) + + build_stage = _make_stage( + job_id, StageType.BUILD_IMAGE_AARCH64, StageState.COMPLETED + ) + stage_repo.save(build_stage) + + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + + result = use_case.execute(command) + assert result.status == "accepted" + + def test_execute_job_not_found( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Should raise JobNotFoundError when job does not exist.""" + command = _make_command() + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + + with pytest.raises(JobNotFoundError): + use_case.execute(command) + + def test_execute_client_mismatch( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Should raise JobNotFoundError when client doesn't own the job.""" + job_id = JobId(_uuid()) + job = _make_job(job_id, ClientId("owner-client")) + job_repo.save(job) + + command = _make_command(job_id=job_id, client_id=ClientId("other-client")) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + + with pytest.raises(JobNotFoundError): + use_case.execute(command) + + def test_execute_stage_not_found( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Should raise JobNotFoundError when validate stage doesn't exist.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + job = _make_job(job_id, client_id) + job_repo.save(job) + + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + + with pytest.raises(JobNotFoundError): + use_case.execute(command) + + def test_execute_stage_guard_violation_no_build_stages( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Should raise StageGuardViolationError when no build stage completed.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + + job = _make_job(job_id, client_id) + job_repo.save(job) + + validate_stage = _make_stage(job_id, StageType.VALIDATE_IMAGE_ON_TEST) + stage_repo.save(validate_stage) + + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + + with pytest.raises(StageGuardViolationError): + use_case.execute(command) + + def test_execute_stage_guard_violation_build_pending( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Should raise StageGuardViolationError when build stage is PENDING.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + + job = _make_job(job_id, client_id) + job_repo.save(job) + + validate_stage = _make_stage(job_id, StageType.VALIDATE_IMAGE_ON_TEST) + stage_repo.save(validate_stage) + + build_stage = _make_stage( + job_id, StageType.BUILD_IMAGE_X86_64, StageState.PENDING + ) + stage_repo.save(build_stage) + + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + + with pytest.raises(StageGuardViolationError): + use_case.execute(command) + + def test_execute_queue_failure( + self, job_repo, stage_repo, audit_repo, uuid_gen + ): + """Should raise ValidationExecutionError when queue submission fails.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + + job = _make_job(job_id, client_id) + job_repo.save(job) + + validate_stage = _make_stage(job_id, StageType.VALIDATE_IMAGE_ON_TEST) + stage_repo.save(validate_stage) + + build_stage = _make_stage( + job_id, StageType.BUILD_IMAGE_X86_64, StageState.COMPLETED + ) + stage_repo.save(build_stage) + + failing_queue = MockQueueService(should_fail=True) + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, failing_queue, uuid_gen + ) + + with pytest.raises(ValidationExecutionError): + use_case.execute(command) + + # Stage should be marked as FAILED + saved_stage = stage_repo.find_by_job_and_name( + job_id, StageName(StageType.VALIDATE_IMAGE_ON_TEST.value) + ) + assert saved_stage.stage_state == StageState.FAILED + + def test_execute_emits_audit_event( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Should emit STAGE_STARTED audit event.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + + job = _make_job(job_id, client_id) + job_repo.save(job) + + validate_stage = _make_stage(job_id, StageType.VALIDATE_IMAGE_ON_TEST) + stage_repo.save(validate_stage) + + build_stage = _make_stage( + job_id, StageType.BUILD_IMAGE_X86_64, StageState.COMPLETED + ) + stage_repo.save(build_stage) + + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + use_case.execute(command) + + events = audit_repo.find_by_job(job_id) + assert len(events) == 1 + assert events[0].event_type == "STAGE_STARTED" + assert events[0].details["stage_name"] == "validate-image-on-test" + + def test_execute_starts_stage( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Stage should transition to IN_PROGRESS after submission.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + + job = _make_job(job_id, client_id) + job_repo.save(job) + + validate_stage = _make_stage(job_id, StageType.VALIDATE_IMAGE_ON_TEST) + stage_repo.save(validate_stage) + + build_stage = _make_stage( + job_id, StageType.BUILD_IMAGE_X86_64, StageState.COMPLETED + ) + stage_repo.save(build_stage) + + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + use_case.execute(command) + + saved_stage = stage_repo.find_by_job_and_name( + job_id, StageName(StageType.VALIDATE_IMAGE_ON_TEST.value) + ) + assert saved_stage.stage_state == StageState.IN_PROGRESS + + def test_execute_submits_correct_request( + self, job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ): + """Submitted request should have correct playbook and stage name.""" + job_id = JobId(_uuid()) + client_id = ClientId("test-client") + + job = _make_job(job_id, client_id) + job_repo.save(job) + + validate_stage = _make_stage(job_id, StageType.VALIDATE_IMAGE_ON_TEST) + stage_repo.save(validate_stage) + + build_stage = _make_stage( + job_id, StageType.BUILD_IMAGE_X86_64, StageState.COMPLETED + ) + stage_repo.save(build_stage) + + command = _make_command(job_id=job_id, client_id=client_id) + use_case = _build_use_case( + job_repo, stage_repo, audit_repo, queue_service, uuid_gen + ) + use_case.execute(command) + + assert len(queue_service.submitted) == 1 + submitted = queue_service.submitted[0] + assert submitted.stage_name == "validate-image-on-test" + assert str(submitted.playbook_path) == "discovery.yml" + assert submitted.job_id == str(job_id)