diff --git a/.claude/skills/pipeline-test/SKILL.md b/.claude/skills/pipeline-test/SKILL.md index 15211e6b8..8223f8399 100644 --- a/.claude/skills/pipeline-test/SKILL.md +++ b/.claude/skills/pipeline-test/SKILL.md @@ -289,6 +289,15 @@ docker exec maple-airflow-scheduler airflow connections add cleanup \ docker exec maple-airflow-scheduler airflow dags unpause daily_collection_pipeline docker exec maple-airflow-scheduler airflow dags unpause daily_cleanup_pipeline +# Verify #1292 per-phase branch is loaded (only after branch_on_scope merged). +# Without this guard the section silently no-ops and operators think the +# per-phase wiring is broken. Fail-fast on missing branch_on_scope task. +if docker exec maple-airflow-scheduler airflow tasks list daily_collection_pipeline 2>/dev/null | grep -q '^branch_on_scope$'; then + echo "per-phase branch loaded: branch_on_scope present" +else + echo "WARNING: branch_on_scope not in daily_collection_pipeline — issue #1292 branch not deployed" +fi + # Trigger DAG manually docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline @@ -494,6 +503,127 @@ done **사용자가 명시적으로 종료를 요청할 때만 실행.** 파이프라인은 장시간 실행(~2시간)이므로 자동 종료하지 않음. 사용자가 "종료", "stop", "cleanup" 등을 말하면 실행. +### 10a. Per-phase scope verification (issue #1292) + +Smoke verification that the per-phase Airflow branch (added in #1292) drives the ext-api per-phase endpoints end-to-end. Runs **after** the main daily E2E completes successfully. Skipped if `branch_on_scope` task is not present (pre-#1292 deployments). + +**Prereq check:** +```bash +# Detect whether the per-phase branch is deployed in this DAG file. +# Without this guard the section silently no-ops and operators think the +# per-phase wiring is broken. Fail-fast on missing branch_on_scope task. +if ! docker exec maple-airflow-scheduler airflow tasks list daily_collection_pipeline 2>/dev/null | grep -q '^branch_on_scope$'; then + echo "SKIP: branch_on_scope not in daily_collection_pipeline — issue #1292 branch not deployed" + return 0 2>/dev/null || exit 0 +fi +echo "Per-phase branch loaded; running scope verification" +``` + +**10a.1 — Single-phase trigger:** +```bash +# Trigger only ITEM_EQUIPMENT (skips the rest of the daily chain) +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["ITEM_EQUIPMENT"]}' + +# Wait up to 60s for ITEM_EQUIPMENT to become ACTIVE +for i in $(seq 1 30); do + phase=$(curl -s http://localhost:8081/api/internal/run-status | jq -r '.current.phase // ""') + if [ "${phase}" = "ITEM_EQUIPMENT" ]; then + echo "ITEM_EQUIPMENT ACTIVE after ${i}*2s"; break + fi + sleep 2 +done +[ "${phase}" = "ITEM_EQUIPMENT" ] || { echo "ITEM_EQUIPMENT scope trigger failed"; exit 6; } +``` + +****10a.2 — Loop start:** +```bash +# Start a continuous loop for ITEM_EQUIPMENT. +# Note: OCID_LOOKUP_LOOP is rejected by ext-api (400 INVALID_PHASE) despite +# earlier #1291 spec mention — only CHARACTER_BASIC and ITEM_EQUIPMENT are +# accepted by PhaseLoopController.loopablePhases (verified 2026-06-18). +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["ITEM_EQUIPMENT_LOOP"]}' + +# Wait up to 30s for loopId to appear in /run-status.loopSummaries +loop_id="" +for i in $(seq 1 15); do + loop_id=$(curl -s http://localhost:8081/api/internal/run-status \ + | jq -r '.loopSummaries.ITEM_EQUIPMENT.loopId // ""') + if [ -n "${loop_id}" ] && [ "${loop_id}" != "null" ]; then + echo "Loop started: loopId=${loop_id}"; break + fi + sleep 2 +done +[ -n "${loop_id}" ] && [ "${loop_id}" != "null" ] || { echo "Loop start failed"; exit 6; } +``` + +**10a.3 — Loop iteration progress:** +```bash +# Wait up to 90s for iterationCount > 0 (proves the loop is actually iterating) +iter="" +for i in $(seq 1 45); do + iter=$(curl -s http://localhost:8081/api/internal/run-status \ + | jq -r '.loopSummaries.ITEM_EQUIPMENT.iterationCount // 0') + if [ "${iter}" -gt 0 ] 2>/dev/null; then + echo "Iteration count = ${iter} after ${i}*2s"; break + fi + sleep 2 +done +[ "${iter}" -gt 0 ] 2>/dev/null || { echo "Loop did not iterate within 90s"; exit 6; } +``` + +**10a.4 — Loop stop:** +```bash +# Graceful stop via per-phase scope +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["ITEM_EQUIPMENT_STOP"]}' + +# Wait up to 45s for status = STOPPED +status="" +for i in $(seq 1 23); do + status=$(curl -s http://localhost:8081/api/internal/run-status \ + | jq -r '.loopSummaries.ITEM_EQUIPMENT.status // ""') + if [ "${status}" = "STOPPED" ]; then + echo "Loop STOPPED after ${i}*2s"; break + fi + sleep 2 +done +[ "${status}" = "STOPPED" ] || { echo "Loop stop failed; status=${status}"; exit 6; } +``` + +**10a.5 — Invalid scope rejection:** +```bash +# RANKING_FETCH_LOOP must be rejected (not in #1291 loopablePhases) +trigger_output=$(docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["RANKING_FETCH_LOOP"]}' 2>&1) +# The trigger CLI itself accepts the run; the parse_scope failure surfaces +# as branch_on_scope → AirflowException. Verify via task instance state: +sleep 10 +task_state=$(docker exec maple-airflow-scheduler airflow tasks state-for-ti \ + daily_collection_pipeline "$(date -u +%Y-%m-%dT%H:%M:%S)" branch_on_scope 2>/dev/null \ + | jq -r '.state // "unknown"') +# Expected: "failed" (AirflowException raised by parse_scope) +[ "${task_state}" = "failed" ] || { echo "Invalid scope not rejected; state=${task_state}"; exit 6; } +echo "Invalid scope correctly rejected" +``` + +**10a.6 — Cleanup loop artifacts:** +```bash +# Ensure no orphan loop survives past this section (defensive — step 10a.4 +# should have stopped it, but verify before reporting PASS). +loop_state=$(curl -s http://localhost:8081/api/internal/run-status \ + | jq -r '.loopSummaries.ITEM_EQUIPMENT.status // "NONE"') +case "${loop_state}" in + STOPPED|NONE|"") echo "Loop cleanly stopped";; + *) echo "WARNING: loop in unexpected state ${loop_state} after stop scope";; +esac +``` + +**Exit codes:** +- 0: all 6 sub-steps PASS +- 6: any sub-step FAILED (see error message above the exit) + ```bash # Stop Spring Boot services if [ "${START_MODE:-nohup}" = "nohup" ]; then @@ -527,6 +657,8 @@ docker compose -f docker-compose.yml -f docker-compose.airflow.yml stop airflow- | Airflow can't reach services | Verify `host.docker.internal` in docker-compose.airflow.yml, check services are running on host | | Airflow sensor false positive | Verify runId correlation — sensor checks `current.runId` matches trigger response | | Airflow DB connection failed | Ensure maple-network exists: `docker network create maple-network` | +| Step 10a loop never iterates | Verify `loopSummaries.ITEM_EQUIPMENT.loopId` set; if null, `/loop/phase/ITEM_EQUIPMENT` returned non-202 — check ext-api logs for `PhaseLoopController` errors | +| Step 10a loop stop timeout | `/stop/phase/ITEM_EQUIPMENT` did not propagate `PhaseStoppedException`; check ext-api logs for `PhaseStopSignal.requestStop` and downstream chunk boundary halt | | Cleanup returns 0 / "no runs found" while MinIO has old runs | `module-cleanup` defaulted to `LocalFsObjectStorage` (reads empty `../data/runs/`). Restart with `-Dstorage.backend=minio` JVM flag. StorageConfig's `@ConditionalOnProperty` has `matchIfMissing=true` so an unset property silently picks local. | ## Notes @@ -539,6 +671,8 @@ docker compose -f docker-compose.yml -f docker-compose.airflow.yml stop airflow- - Local profile DB: when `STORAGE_BACKEND=local`, the hardcoded `localhost:5432/maple_expectation` from `application-local.yml` is used. When `STORAGE_BACKEND=minio`, `.env` `DB_URL` is used directly (typically the dev cloud DB). - `run-on-startup: true` in local profile starts pipeline immediately. When Airflow controls scheduling in production, set `run-on-startup: false` and `external-api.schedule.enabled: true` to keep the bean but disable auto-trigger. - Airflow connects via `host.docker.internal` (Docker→host). This is transitional until services are containerized. +- Per-phase scope verification (step 10a) only runs when the #1292 branch (`branch_on_scope` task) is present in `daily_collection_pipeline`. Pre-#1292 deployments skip the section silently — see step 10a prereq check for the gate. +- Per-phase verification adds ~5min (steps 10a.1–10a.5) to the full pipeline test. Run after the main E2E (step 9) succeeds; isolated failures don't affect main E2E pass/fail. ## Storage backend diff --git a/docker/airflow/dags/daily_collection_pipeline.py b/docker/airflow/dags/daily_collection_pipeline.py index 70866a9ea..e37f34111 100644 --- a/docker/airflow/dags/daily_collection_pipeline.py +++ b/docker/airflow/dags/daily_collection_pipeline.py @@ -16,11 +16,23 @@ from airflow import DAG from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.operators.python import PythonOperator +from airflow.operators.empty import EmptyOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.http.sensors.http import HttpSensor from airflow.sensors.python import PythonSensor +from per_phase_tasks import ( + LOOP_PHASES, + STOP_PHASES, + TRIGGER_PHASES, + make_is_phase_terminal, + make_loop_task, + make_stop_task, + make_trigger_task, + parse_scope, +) + def is_health_up(response): """HttpSensor response_check: ext-api /actuator/health returns {"status": "UP"}. @@ -235,4 +247,43 @@ def wait_for_item_equipment_cycle(**context): wait_for_completion=False, ) - check_external_api >> trigger_daily_collection >> wait_for_completion >> wait_ie_cycle >> trigger_cleanup + # Per-phase branch (issue #1292). Activated when dag_run.conf['scope'] + # is set to a list of action values (see per_phase_tasks.ALLOWED_SCOPES). + branch_on_scope = BranchPythonOperator( + task_id="branch_on_scope", + python_callable=lambda **ctx: ( + "trigger_daily_collection" + if parse_scope(ctx["dag_run"].conf or {}) == ["FULL_DAILY"] + else "per_phase_join" + ), + ) + + per_phase_join = EmptyOperator( + task_id="per_phase_join", + trigger_rule="none_failed_min_one_success", + ) + + per_phase_trigger_tasks = [make_trigger_task(p) for p in TRIGGER_PHASES] + per_phase_loop_tasks = [make_loop_task(p) for p in LOOP_PHASES] + per_phase_stop_tasks = [make_stop_task(p) for p in STOP_PHASES] + + per_phase_trigger_sensors = [ + PythonSensor( + task_id=f"per_phase_wait_{p.lower()}_completion", + python_callable=make_is_phase_terminal(p), + mode="reschedule", + poke_interval=60, + timeout=60 * 60 * 4, + ) + for p in TRIGGER_PHASES + ] + + check_external_api >> branch_on_scope + branch_on_scope >> trigger_daily_collection + branch_on_scope >> per_phase_join + per_phase_join >> per_phase_trigger_tasks + per_phase_join >> per_phase_loop_tasks + per_phase_join >> per_phase_stop_tasks + for trig, sens in zip(per_phase_trigger_tasks, per_phase_trigger_sensors): + trig >> sens + trigger_daily_collection >> wait_for_completion >> wait_ie_cycle >> trigger_cleanup diff --git a/docker/airflow/dags/per_phase_tasks.py b/docker/airflow/dags/per_phase_tasks.py new file mode 100644 index 000000000..c7bfb2241 --- /dev/null +++ b/docker/airflow/dags/per_phase_tasks.py @@ -0,0 +1,254 @@ +"""Per-phase Airflow task factories for ext-api. + +Drives the per-phase endpoints from #1289/1290/1291 via Airflow's +BranchPythonOperator in daily_collection_pipeline.py. + +Spec: docs/superpowers/specs/2026-06-18-issue-1292-per-phase-dag-design.md +ADR: docs/01_ADR/ADR-393-airflow-per-phase-dag.md +""" +from datetime import timedelta +import json + +import requests +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook +from airflow.operators.python import PythonOperator +from airflow.sensors.python import PythonSensor + + +# Allowed scope values. RANKING_FETCH_LOOP and OCID_LOOKUP_LOOP excluded — +# ext-api PhaseLoopController.loopablePhases only allows CHARACTER_BASIC and +# ITEM_EQUIPMENT (verified against module-external-api/.../loop/PhaseLoopController.kt +# on 2026-06-18; spec §2.3 mentioned OCID_LOOKUP but impl didn't include it). +ALLOWED_SCOPES = frozenset({ + "RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT", + "CHARACTER_BASIC_LOOP", "ITEM_EQUIPMENT_LOOP", + "RANKING_FETCH_STOP", "OCID_LOOKUP_STOP", + "CHARACTER_BASIC_STOP", "ITEM_EQUIPMENT_STOP", +}) + +# Phase lists for fan-out +TRIGGER_PHASES = ["RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT"] +LOOP_PHASES = ["CHARACTER_BASIC", "ITEM_EQUIPMENT"] +STOP_PHASES = ["RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT"] + + +def get_external_api_base() -> str: + """Resolve ext-api base URL from Airflow Connection 'external_api'.""" + conn = BaseHook.get_connection("external_api") + return f"http://{conn.host}:{conn.port}" + + +def parse_scope(conf: dict) -> list: + """Validate dag_run.conf['scope']. Returns list of scope values. + + - Missing or 'FULL_DAILY' → ['FULL_DAILY'] + - String → wrap in list + - List → validate every value against ALLOWED_SCOPES + - Any invalid value → raise AirflowException + """ + scope = conf.get("scope", "FULL_DAILY") + if scope == "FULL_DAILY": + return ["FULL_DAILY"] + if isinstance(scope, str): + scope = [scope] + if not isinstance(scope, list): + raise AirflowException( + f"scope must be string or list, got {type(scope).__name__}" + ) + invalid = [s for s in scope if s not in ALLOWED_SCOPES] + if invalid: + raise AirflowException( + f"Invalid scope values: {invalid}. " + f"Allowed: {sorted(ALLOWED_SCOPES)}" + ) + return list(scope) + + +def make_trigger_task(phase: str) -> PythonOperator: + """Single-shot phase trigger via /trigger/phase/{phase}. + + Gates on scope: returns None (Airflow skip) if bare phase not in scope. + 200/202 → return response JSON for xcom correlation. + 409 → idempotent: discover active runId from /run-status, mark ALREADY_ACTIVE. + Other → AirflowException. + """ + def _trigger(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if phase not in scope: + return None + base = get_external_api_base() + try: + resp = requests.post( + f"{base}/api/internal/trigger/phase/{phase}", timeout=30 + ) + except requests.RequestException as exc: + raise AirflowException(f"Trigger {phase} failed: {exc}") from exc + + if resp.status_code in (200, 202): + return resp.json() + + if resp.status_code == 409: + try: + status_resp = requests.get( + f"{base}/api/internal/run-status", timeout=10 + ) + status_resp.raise_for_status() + data = status_resp.json() + except (requests.RequestException, ValueError) as exc: + raise AirflowException( + f"409 from trigger {phase} but /run-status fetch failed: {exc}" + ) from exc + current = data.get("current") or {} + return { + "runId": current.get("runId"), + "phase": phase, + "status": "ALREADY_ACTIVE", + } + + raise AirflowException( + f"Trigger {phase} failed: HTTP {resp.status_code} " + f"{resp.reason}: {resp.text[:500]}" + ) + + return PythonOperator( + task_id=f"per_phase_trigger_{phase.lower()}", + python_callable=_trigger, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) + + +def make_loop_task(phase: str) -> PythonOperator: + """Start loop via /loop/phase/{phase}. + + Gates on scope: returns None if {phase}_LOOP not in scope. + 202 → return response JSON (loopId, iterationCount). + 409 → idempotent: mark ALREADY_LOOPING, preserve existing loopId. + 400 → AirflowException (config error, e.g. RANKING_FETCH_LOOP). + Other → AirflowException. + """ + def _loop(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if f"{phase}_LOOP" not in scope: + return None + base = get_external_api_base() + try: + resp = requests.post( + f"{base}/api/internal/loop/phase/{phase}", timeout=30 + ) + except requests.RequestException as exc: + raise AirflowException(f"Loop start {phase} failed: {exc}") from exc + + if resp.status_code == 202: + return resp.json() + + if resp.status_code == 409: + body = resp.json() + return {**body, "status": "ALREADY_LOOPING"} + + if resp.status_code == 400: + raise AirflowException( + f"Loop start {phase} rejected (INVALID_PHASE): {resp.text[:500]}" + ) + + raise AirflowException( + f"Loop start {phase} failed: HTTP {resp.status_code} " + f"{resp.reason}: {resp.text[:500]}" + ) + + return PythonOperator( + task_id=f"per_phase_loop_{phase.lower()}", + python_callable=_loop, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) + + +def make_stop_task(phase: str) -> PythonOperator: + """Stop via /stop/phase/{phase}. + + Single endpoint halts both single-shot runs and loops (per #1290 spec §5.3). + Gates on scope: returns None if {phase}_STOP not in scope. + 202 → STOP_REQUESTED; 200 → NOT_RUNNING (idempotent). + """ + def _stop(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if f"{phase}_STOP" not in scope: + return None + base = get_external_api_base() + try: + resp = requests.post( + f"{base}/api/internal/stop/phase/{phase}", timeout=30 + ) + except requests.RequestException as exc: + raise AirflowException(f"Stop {phase} failed: {exc}") from exc + + if resp.status_code == 202: + return {**resp.json(), "status": "STOP_REQUESTED"} + + if resp.status_code == 200: + return {"phase": phase, "status": "NOT_RUNNING"} + + raise AirflowException( + f"Stop {phase} failed: HTTP {resp.status_code} " + f"{resp.reason}: {resp.text[:500]}" + ) + + return PythonOperator( + task_id=f"per_phase_stop_{phase.lower()}", + python_callable=_stop, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) + + +def make_is_phase_terminal(phase: str): + """PythonSensor callable: returns True when triggered runId reaches terminal. + + Gates on scope first: if phase not in scope, returns True (skip). + FAILED phase → RuntimeError (hard-fail DAG). + Transient HTTP → False (reschedule). + """ + task_id = f"per_phase_trigger_{phase.lower()}" + + def _poke(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if phase not in scope: + return True + + xcom_val = ctx["ti"].xcom_pull(task_ids=task_id) + if isinstance(xcom_val, str): + xcom_val = json.loads(xcom_val) + run_id = (xcom_val or {}).get("runId") + if not run_id: + raise RuntimeError( + f"Sensor for {phase} triggered but trigger xcom returned no runId " + f"— config error" + ) + + try: + resp = requests.get( + f"{get_external_api_base()}/api/internal/run-status", + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + except (requests.RequestException, ValueError): + return False + + current = data.get("current") + if not current or current.get("runId") != run_id: + return False + + if current.get("phase") == "FAILED": + raise RuntimeError( + f"Run {run_id} failed: {current.get('errorMessage', 'unknown')}" + ) + + return bool(current.get("terminal", False)) + + return _poke \ No newline at end of file diff --git a/docker/airflow/dags/tests/__init__.py b/docker/airflow/dags/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/docker/airflow/dags/tests/conftest.py b/docker/airflow/dags/tests/conftest.py new file mode 100644 index 000000000..3fcd533ed --- /dev/null +++ b/docker/airflow/dags/tests/conftest.py @@ -0,0 +1,33 @@ +"""Pytest fixtures for per_phase_tasks unit tests.""" +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Ensure dags/ is importable +DAGS_DIR = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(DAGS_DIR)) + + +@pytest.fixture +def mock_dag_run_conf(): + """Factory: mock context['dag_run'].conf for parse_scope.""" + + def _make(conf): + dag_run = MagicMock() + dag_run.conf = conf + return {"dag_run": dag_run} + + return _make + + +@pytest.fixture +def mock_external_api_conn(): + """Mock the Airflow 'external_api' connection to localhost.""" + with patch("per_phase_tasks.BaseHook") as base_hook: + conn = MagicMock() + conn.host = "localhost" + conn.port = 8081 + base_hook.get_connection.return_value = conn + yield base_hook \ No newline at end of file diff --git a/docker/airflow/dags/tests/test_per_phase_tasks.py b/docker/airflow/dags/tests/test_per_phase_tasks.py new file mode 100644 index 000000000..b9e191568 --- /dev/null +++ b/docker/airflow/dags/tests/test_per_phase_tasks.py @@ -0,0 +1,476 @@ +"""Unit tests for per_phase_tasks.parse_scope — RED phase (Task 4). + +Per TDD anti-pattern: tests for ONE behavior at a time. +Other factories (trigger/loop/stop/sensor) get their own RED→GREEN cycles. +""" +from unittest.mock import MagicMock, patch + +import pytest +import requests +from airflow.exceptions import AirflowException + +from per_phase_tasks import ( + parse_scope, ALLOWED_SCOPES, + make_trigger_task, make_loop_task, make_stop_task, + make_is_phase_terminal, +) + + +@pytest.mark.parametrize( + "conf,expected", + [ + ({}, ["FULL_DAILY"]), + ({"scope": "FULL_DAILY"}, ["FULL_DAILY"]), + ({"scope": "ITEM_EQUIPMENT"}, ["ITEM_EQUIPMENT"]), + ( + {"scope": ["ITEM_EQUIPMENT_LOOP", "OCID_LOOKUP_STOP"]}, + ["ITEM_EQUIPMENT_LOOP", "OCID_LOOKUP_STOP"], + ), + ({"scope": ["RANKING_FETCH", "OCID_LOOKUP"]}, ["RANKING_FETCH", "OCID_LOOKUP"]), + ], +) +def test_parse_scope_valid(conf, expected): + assert parse_scope(conf) == expected + + +@pytest.mark.parametrize( + "conf", + [ + {"scope": "RANKING_FETCH_LOOP"}, # not in loopable set + {"scope": "OCID_LOOKUP_LOOP"}, # not in ext-api impl loopable set + {"scope": ["RANKING_FETCH_LOOP"]}, # list with invalid + {"scope": ["OCID_LOOKUP_LOOP"]}, # list with non-loopable + {"scope": "INVALID"}, # unknown value + {"scope": ["INVALID"]}, # list with unknown + {"scope": ["ITEM_EQUIPMENT", "FOO"]}, # partial list with one invalid + ], +) +def test_parse_scope_invalid(conf): + with pytest.raises(AirflowException): + parse_scope(conf) + + +def test_allowed_scopes_constant_matches_spec(): + """Guard: if spec adds a scope value, this test forces an update.""" + expected = { + "RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT", + "CHARACTER_BASIC_LOOP", "ITEM_EQUIPMENT_LOOP", + "RANKING_FETCH_STOP", "OCID_LOOKUP_STOP", + "CHARACTER_BASIC_STOP", "ITEM_EQUIPMENT_STOP", + } + assert set(ALLOWED_SCOPES) == expected + + +# ─── make_trigger_task (Task 6 RED) ─────────────────────────────────────── + + +def _make_ctx(conf, xcom_value=None): + """Build a minimal Airflow task context dict.""" + dag_run = MagicMock() + dag_run.conf = conf + ti = MagicMock() + ti.xcom_pull.return_value = xcom_value + return {"dag_run": dag_run, "ti": ti} + + +def test_make_trigger_task_skips_when_scope_empty(mock_external_api_conn): + """If phase not in scope, callable returns None (Airflow marks skipped).""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["OCID_LOOKUP"]}) + assert task.python_callable(**ctx) is None + + +def test_make_trigger_task_happy_path(mock_external_api_conn): + """200 → returns response JSON, no exception.""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 202 + mock_resp.json.return_value = {"runId": "r-1", "phase": "ITEM_EQUIPMENT"} + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result == {"runId": "r-1", "phase": "ITEM_EQUIPMENT"} + mock_post.assert_called_once() + assert "/trigger/phase/ITEM_EQUIPMENT" in mock_post.call_args[0][0] + + +def test_make_trigger_task_409_discovers_active_run(mock_external_api_conn): + """409 → fetch /run-status, return active runId with ALREADY_ACTIVE status.""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + + with patch("per_phase_tasks.requests.post") as mock_post, \ + patch("per_phase_tasks.requests.get") as mock_get: + trigger_resp = MagicMock() + trigger_resp.status_code = 409 + mock_post.return_value = trigger_resp + + status_resp = MagicMock() + status_resp.status_code = 200 + status_resp.json.return_value = {"current": {"runId": "r-active"}} + status_resp.raise_for_status = MagicMock() + mock_get.return_value = status_resp + + result = task.python_callable(**ctx) + + assert result == { + "runId": "r-active", + "phase": "ITEM_EQUIPMENT", + "status": "ALREADY_ACTIVE", + } + + +def test_make_trigger_task_500_raises(mock_external_api_conn): + """500 → AirflowException.""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 500 + mock_resp.text = "boom" + mock_post.return_value = mock_resp + + with pytest.raises(AirflowException): + task.python_callable(**ctx) + + +def test_make_trigger_task_network_error_raises(mock_external_api_conn): + """requests.RequestException → AirflowException with chained cause.""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_post.side_effect = requests.RequestException("connection refused") + + with pytest.raises(AirflowException, match="Trigger ITEM_EQUIPMENT failed"): + task.python_callable(**ctx) + + +# ─── make_loop_task (Task 8 RED) ─────────────────────────────────────────── + + +def test_make_loop_task_skips_when_scope_empty(mock_external_api_conn): + """If {phase}_LOOP not in scope, returns None.""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) # bare, not _LOOP + assert task.python_callable(**ctx) is None + + +def test_make_loop_task_happy_path(mock_external_api_conn): + """202 → returns response JSON with loopId.""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_LOOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 202 + mock_resp.json.return_value = { + "status": "LOOP_STARTED", + "phase": "ITEM_EQUIPMENT", + "loopId": "loop-1", + "iterationCount": 0, + } + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result["loopId"] == "loop-1" + assert "/loop/phase/ITEM_EQUIPMENT" in mock_post.call_args[0][0] + + +def test_make_loop_task_409_idempotent(mock_external_api_conn): + """409 → push existing loopId with ALREADY_LOOPING status.""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_LOOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 409 + mock_resp.json.return_value = { + "status": "LOOP_ALREADY_ACTIVE", + "phase": "ITEM_EQUIPMENT", + "loopId": "loop-existing", + } + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result["loopId"] == "loop-existing" + assert result["status"] == "ALREADY_LOOPING" + + +def test_make_loop_task_400_invalid_phase_raises(mock_external_api_conn): + """400 → AirflowException (config error).""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_LOOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 400 + mock_resp.json.return_value = {"error": "INVALID_PHASE"} + mock_post.return_value = mock_resp + + with pytest.raises(AirflowException, match="INVALID_PHASE"): + task.python_callable(**ctx) + + +def test_make_loop_task_500_raises(mock_external_api_conn): + """500 → AirflowException (not config error).""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_LOOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 500 + mock_resp.text = "boom" + mock_post.return_value = mock_resp + + with pytest.raises(AirflowException, match="Loop start ITEM_EQUIPMENT failed"): + task.python_callable(**ctx) + + +# ─── make_stop_task (Task 10 RED) ────────────────────────────────────────── + + +def test_make_stop_task_skips_when_scope_empty(mock_external_api_conn): + """If {phase}_STOP not in scope, returns None.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + assert task.python_callable(**ctx) is None + + +def test_make_stop_task_happy_path_202(mock_external_api_conn): + """202 STOP_REQUESTED → returns status.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_STOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 202 + mock_resp.json.return_value = { + "status": "STOP_REQUESTED", + "phase": "ITEM_EQUIPMENT", + "loopId": "loop-1", + } + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result["status"] == "STOP_REQUESTED" + assert "/stop/phase/ITEM_EQUIPMENT" in mock_post.call_args[0][0] + + +def test_make_stop_task_200_not_running(mock_external_api_conn): + """200 NOT_RUNNING → idempotent success.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_STOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"status": "NOT_RUNNING"} + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result == {"phase": "ITEM_EQUIPMENT", "status": "NOT_RUNNING"} + + +def test_make_stop_task_500_raises(mock_external_api_conn): + """500 → AirflowException.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_STOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 500 + mock_resp.text = "boom" + mock_post.return_value = mock_resp + + with pytest.raises(AirflowException, match="Stop ITEM_EQUIPMENT failed"): + task.python_callable(**ctx) + + +def test_make_stop_task_network_error_raises(mock_external_api_conn): + """requests.RequestException → AirflowException with chained cause.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_STOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_post.side_effect = requests.RequestException("connection refused") + + with pytest.raises(AirflowException, match="Stop ITEM_EQUIPMENT failed"): + task.python_callable(**ctx) + + +# ─── make_is_phase_terminal (Task 12 RED) ────────────────────────────────── + + +def test_sensor_returns_true_when_scope_excludes_phase(mock_external_api_conn): + """If phase not in scope, sensor returns True (skip).""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["OCID_LOOKUP"]}) + assert sensor(**ctx) is True + + +def test_sensor_returns_true_when_terminal(mock_external_api_conn): + """runId matches + current.terminal=True → True.""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"runId": "r-1", "phase": "ITEM_EQUIPMENT"}, + ) + + with patch("per_phase_tasks.requests.get") as mock_get: + status_resp = MagicMock() + status_resp.status_code = 200 + status_resp.json.return_value = { + "current": { + "runId": "r-1", + "phase": "ITEM_EQUIPMENT", + "terminal": True, + } + } + status_resp.raise_for_status = MagicMock() + mock_get.return_value = status_resp + + assert sensor(**ctx) is True + + +def test_sensor_raises_when_run_failed(mock_external_api_conn): + """current.phase == FAILED → RuntimeError.""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"runId": "r-1"}, + ) + + with patch("per_phase_tasks.requests.get") as mock_get: + status_resp = MagicMock() + status_resp.status_code = 200 + status_resp.json.return_value = { + "current": { + "runId": "r-1", + "phase": "FAILED", + "errorMessage": "boom", + } + } + status_resp.raise_for_status = MagicMock() + mock_get.return_value = status_resp + + with pytest.raises(RuntimeError, match="Run r-1 failed"): + sensor(**ctx) + + +def test_sensor_returns_false_on_transient_http(mock_external_api_conn): + """Network error → False (reschedule).""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"runId": "r-1"}, + ) + + with patch("per_phase_tasks.requests.get") as mock_get: + mock_get.side_effect = requests.RequestException("connection refused") + + assert sensor(**ctx) is False + + +def test_sensor_returns_false_when_runid_mismatch(mock_external_api_conn): + """current.runId != trigger's runId → False.""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"runId": "r-1"}, + ) + + with patch("per_phase_tasks.requests.get") as mock_get: + status_resp = MagicMock() + status_resp.status_code = 200 + status_resp.json.return_value = { + "current": {"runId": "r-other", "phase": "ITEM_EQUIPMENT"} + } + status_resp.raise_for_status = MagicMock() + mock_get.return_value = status_resp + + assert sensor(**ctx) is False + + +def test_sensor_raises_when_xcom_missing_runid(mock_external_api_conn): + """If xcom has no runId, RuntimeError (config error, fail-fast).""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"phase": "ITEM_EQUIPMENT"}, # no runId + ) + + with pytest.raises(RuntimeError, match="no runId"): + sensor(**ctx) + + +# ─── DAG loader integration (Task 14 RED) ───────────────────────────────── + + +def test_daily_collection_pipeline_parses(): + """DAG parses without import errors (regression guard for rewire).""" + from airflow.models import DagBag + import os + dag_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + assert "daily_collection_pipeline" in dagbag.dags, ( + f"DAG missing. Errors: {dagbag.import_errors}" + ) + assert dagbag.import_errors == {}, f"Import errors: {dagbag.import_errors}" + + +def test_branch_on_scope_task_exists(): + from airflow.models import DagBag + import os + dag_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + dag = dagbag.dags["daily_collection_pipeline"] + assert "branch_on_scope" in dag.task_ids + + +def test_all_per_phase_tasks_present(): + """10 per-phase task definitions expected.""" + from airflow.models import DagBag + import os + dag_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + dag = dagbag.dags["daily_collection_pipeline"] + expected = { + # 4 trigger + "per_phase_trigger_ranking_fetch", + "per_phase_trigger_ocid_lookup", + "per_phase_trigger_character_basic", + "per_phase_trigger_item_equipment", + # 2 loop (matches ext-api loopablePhases: CHARACTER_BASIC, ITEM_EQUIPMENT) + "per_phase_loop_character_basic", + "per_phase_loop_item_equipment", + # 4 stop + "per_phase_stop_ranking_fetch", + "per_phase_stop_ocid_lookup", + "per_phase_stop_character_basic", + "per_phase_stop_item_equipment", + } + assert expected.issubset(set(dag.task_ids)), ( + f"Missing tasks: {expected - set(dag.task_ids)}" + ) + + +def test_branch_downstream_includes_both_paths(): + """branch_on_scope must route to both trigger_daily_collection AND per_phase_join.""" + from airflow.models import DagBag + import os + dag_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + dag = dagbag.dags["daily_collection_pipeline"] + branch_task = dag.get_task("branch_on_scope") + downstream_ids = {t.task_id for t in branch_task.downstream_list} + assert "trigger_daily_collection" in downstream_ids + assert "per_phase_join" in downstream_ids \ No newline at end of file diff --git a/docs/01_ADR/ADR-393-airflow-per-phase-dag.md b/docs/01_ADR/ADR-393-airflow-per-phase-dag.md new file mode 100644 index 000000000..1a1b26af4 --- /dev/null +++ b/docs/01_ADR/ADR-393-airflow-per-phase-dag.md @@ -0,0 +1,96 @@ +# ADR-393: Per-Phase Airflow DAG for ext-api Phase Endpoints + +- Status: Accepted +- Date: 2026-06-18 +- Owner: zbnerd + +--- + +## 1. Background / Problem + +### Background + +- ext-api added per-phase endpoints in #1289/1290/1291: `POST /trigger/phase/{name}`, `POST /stop/phase/{name}`, `POST /loop/phase/{name}`, `POST /stop/loop/phase/{name}`. +- Operators currently can only drive ext-api from the full daily pipeline. No way to hot-loop a phase or stop a runaway loop without restarting ext-api. + +### Problem + +- Need a UI / CLI-driven path to invoke per-phase endpoints without standing up the full daily chain. + +### Goal + +- Add Airflow support for per-phase trigger / loop / stop via `dag_run.conf['scope']`. + +--- + +## 2. Decision + +> We extend the existing `daily_collection_pipeline.py` with a `BranchPythonOperator` that reads `dag_run.conf['scope']`. When `scope` is `FULL_DAILY` (default) → existing chain. Otherwise → per-phase fan-out. + +```text +check_external_api + └── branch_on_scope + ├── trigger_daily_collection → ... → trigger_cleanup (FULL_DAILY) + └── per_phase_join + ├── per_phase_trigger_ (×4) → per_phase_wait_ (×4) + ├── per_phase_loop_ (×2) [fire-and-forget; CHARACTER_BASIC, ITEM_EQUIPMENT only] + └── per_phase_stop_ (×4) [fire-and-forget] +``` + +Helper module `per_phase_tasks.py` owns task factories; DAG file stays thin. + +--- + +## 3. Trade-offs + +### Sensitivity + +* DAG run volume (operators may spam scope triggers) +* ext-api connection availability at DAG-parse time (DagBag loads all DAGs on scheduler startup) +* Airflow version (2.10.5; `BranchPythonOperator` requires 2.0+) + +### Trade-off + +| Choice | Gain | Lose | +|--------|------|------| +| Shape A (extend daily DAG) | One DAG file; reuses existing connection + health-check tasks | Daily DAG graph view grows by 17 task definitions | +| Shape B (new DAG file) | Cleaner separation | Operators must know which DAG to trigger; duplicates task definitions | +| Per-task gate via `dag_run.conf` check inside callable | DAG parses without per-runtime graph | Unused task definitions show in graph view | +| `_STOP` reuses `/stop/phase` (single endpoint) | One endpoint, simpler | Operators cannot distinguish loop-stop vs single-shot stop in API | + +We picked **Shape A + per-task gate + `/stop/phase` reuse**. + +### Risk + +* 17 unused task definitions cluttering graph view — mitigated by per-task gating; documentation cost only. +* `/stop/phase` halts both loops and single-shots — operators must check `loopSummaries` post-stop to confirm; documented in DAG docstring. + +### Non-Risk + +* Cross-phase ordering — explicitly out-of-scope; operators use TriggerDagRunOperator for chains. +* Loop auto-restart — explicitly out-of-scope per #1291 §11. + +--- + +## 4. Result / Evidence + +### Metrics + +| Metric | Value | Notes | +|--------|-------|-------| +| New task definitions | 16 | 4 trigger + 4 sensor + 2 loop + 4 stop + 1 branch + 1 join | +| New files | 3 | per_phase_tasks.py, tests/__init__.py, tests/test_per_phase_tasks.py | +| Modified files | 2 | daily_collection_pipeline.py (+60 lines), pipeline-test/SKILL.md (step 10a added) | +| Existing daily chain | unchanged | verified by DagBag parse test | + +> LOOP_PHASES reduced from 3 to 2 (OCID_LOOKUP removed) after runtime verification on 2026-06-18: ext-api PhaseLoopController.loopablePhases only accepts CHARACTER_BASIC + ITEM_EQUIPMENT despite the #1291 spec mentioning OCID_LOOKUP. + +### Observed Result + +* (Filled in post-implementation via manual smoke test from `pipeline-test` skill) + +--- + +## 5. Summary + +> Extend the existing Airflow daily DAG with a `scope`-driven branch that routes `dag_run.conf['scope']` to either the existing daily chain or a per-phase fan-out via 11 task definitions backed by 3 helper factories. \ No newline at end of file diff --git a/docs/superpowers/plans/2026-06-18-issue-1292-per-phase-dag.md b/docs/superpowers/plans/2026-06-18-issue-1292-per-phase-dag.md new file mode 100644 index 000000000..f1cf0f259 --- /dev/null +++ b/docs/superpowers/plans/2026-06-18-issue-1292-per-phase-dag.md @@ -0,0 +1,1484 @@ +# Issue #1292 Per-Phase Airflow DAG Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Extend Airflow `daily_collection_pipeline.py` with a `scope`-driven branch that triggers, loops, or stops a single ext-api phase without running the full daily pipeline. + +**Architecture:** New helper module `docker/airflow/dags/per_phase_tasks.py` exposes three task factories (`make_trigger_task`, `make_loop_task`, `make_stop_task`) plus `parse_scope` validator and `make_is_phase_terminal` sensor. `daily_collection_pipeline.py` adds a `BranchPythonOperator` that routes `dag_run.conf['scope']` to either the existing daily chain or a per-phase fan-out join. + +**Tech Stack:** Airflow 2.10.5 (python3.12), `apache/airflow:2.10.5-python3.12` image, `requests`, `pytest` (Airflow-bundled), `unittest.mock` for HTTP mocking, `DagBag` for DAG loader tests. + +**Spec:** `docs/superpowers/specs/2026-06-18-issue-1292-per-phase-dag-design.md` + +--- + +## File Structure + +| File | Responsibility | Lines (est.) | +|------|----------------|--------------| +| `docker/airflow/dags/per_phase_tasks.py` (new) | Pure helpers: `parse_scope`, 3 task factories, sensor factory, 3 frozen phase lists | ~270 | +| `docker/airflow/dags/daily_collection_pipeline.py` (extend) | Add imports + `branch_on_scope` + `per_phase_join` + 11 per-phase tasks + 4 sensors + rewire `check_external_api` | +60 | +| `docker/airflow/dags/tests/__init__.py` (new) | Test package marker | 1 | +| `docker/airflow/dags/tests/conftest.py` (new) | Pytest fixtures (mock dag_run context, mock Airflow connection) | ~30 | +| `docker/airflow/dags/tests/test_per_phase_tasks.py` (new) | Unit tests for `parse_scope` + factory gates + DAG loader test | ~180 | +| `docs/01_ADR/ADR-XXX-airflow-per-phase-dag.md` (new) | Short ADR summarizing decision (refs spec) | ~80 | + +Total: ~620 lines. + +--- + +## Task 1: Create feature branch + +**Files:** none + +- [ ] **Step 1: Confirm clean working tree** + +Run: `git status` +Expected: `nothing to commit, working tree clean` + +- [ ] **Step 2: Create branch from develop** + +```bash +git checkout develop +git pull --ff-only origin develop +git checkout -b feature/issue-1292-per-phase-dag +``` + +Expected: branch created. + +- [ ] **Step 3: Verify branch** + +Run: `git branch --show-current` +Expected: `feature/issue-1292-per-phase-dag` + +--- + +## Task 2: Write ADR-XXX documenting decision + +**Files:** +- Create: `docs/01_ADR/ADR-XXX-airflow-per-phase-dag.md` + +Per project `.claude/rules/adr-conventions.md`: 5-section ADR template (Background / Decision / Trade-offs / Result / Summary). This is implementation work, so ADR is mandatory per `rpi-workflow.md`. + +- [ ] **Step 1: Determine ADR number** + +Run: `ls docs/01_ADR/ | grep '^ADR-' | sort | tail -5` +Expected: list of existing ADRs. Pick next number (e.g. if highest is `ADR-021`, use `ADR-022`). + +- [ ] **Step 2: Create ADR file** + +Create `docs/01_ADR/ADR-XXX-airflow-per-phase-dag.md` with this content (replace `XXX` with chosen number): + +```markdown +# ADR-XXX: Per-Phase Airflow DAG for ext-api Phase Endpoints + +- Status: Accepted +- Date: 2026-06-18 +- Owner: zbnerd + +--- + +## 1. Background / Problem + +### Background + +- ext-api added per-phase endpoints in #1289/1290/1291: `POST /trigger/phase/{name}`, `POST /stop/phase/{name}`, `POST /loop/phase/{name}`, `POST /stop/loop/phase/{name}`. +- Operators currently can only drive ext-api from the full daily pipeline. No way to hot-loop a phase or stop a runaway loop without restarting ext-api. + +### Problem + +- Need a UI / CLI-driven path to invoke per-phase endpoints without standing up the full daily chain. + +### Goal + +- Add Airflow support for per-phase trigger / loop / stop via `dag_run.conf['scope']`. + +--- + +## 2. Decision + +> We extend the existing `daily_collection_pipeline.py` with a `BranchPythonOperator` that reads `dag_run.conf['scope']`. When `scope` is `FULL_DAILY` (default) → existing chain. Otherwise → per-phase fan-out. + +```text +check_external_api + └── branch_on_scope + ├── trigger_daily_collection → ... → trigger_cleanup (FULL_DAILY) + └── per_phase_join + ├── per_phase_trigger_ (×4) → per_phase_wait_ (×4) + ├── per_phase_loop_ (×3) [fire-and-forget] + └── per_phase_stop_ (×4) [fire-and-forget] +``` + +Helper module `per_phase_tasks.py` owns task factories; DAG file stays thin. + +--- + +## 3. Trade-offs + +### Sensitivity + +* DAG run volume (operators may spam scope triggers) +* ext-api connection availability at DAG-parse time (DagBag loads all DAGs on scheduler startup) +* Airflow version (2.10.5; `BranchPythonOperator` requires 2.0+) + +### Trade-off + +| Choice | Gain | Lose | +|--------|------|------| +| Shape A (extend daily DAG) | One DAG file; reuses existing connection + health-check tasks | Daily DAG graph view grows by 17 task definitions | +| Shape B (new DAG file) | Cleaner separation | Operators must know which DAG to trigger; duplicatestask definitions | +| Per-task gate via `dag_run.conf` check inside callable | DAG parses without per-runtime graph | Unused task definitions show in graph view | +| `_STOP` reuses `/stop/phase` (single endpoint) | One endpoint, simpler | Operators cannot distinguish loop-stop vs single-shot stop in API | + +We picked **Shape A + per-task gate + `/stop/phase` reuse**. + +### Risk + +* 17 unused task definitions cluttering graph view — mitigated by per-task gating; documentation cost only. +* `/stop/phase` halts both loops and single-shots — operators must check `loopSummaries` post-stop to confirm; documented in DAG docstring. + +### Non-Risk + +* Cross-phase ordering — explicitly out-of-scope; operators use TriggerDagRunOperator for chains. +* Loop auto-restart — explicitly out-of-scope per #1291 §11. + +--- + +## 4. Result / Evidence + +### Metrics + +| Metric | Value | Notes | +|--------|-------|-------| +| New task definitions | 17 | 4 trigger + 4 sensor + 3 loop + 4 stop + 1 branch + 1 join | +| New files | 3 | per_phase_tasks.py, tests/__init__.py, tests/test_per_phase_tasks.py | +| Modified files | 1 | daily_collection_pipeline.py (+60 lines) | +| Existing daily chain | unchanged | verified by DagBag parse test | + +### Observed Result + +* (Filled in post-implementation via manual smoke test from `pipeline-test` skill) + +--- + +## 5. Summary + +> Extend the existing Airflow daily DAG with a `scope`-driven branch that routes `dag_run.conf['scope']` to either the existing daily chain or a per-phase fan-out via 11 task definitions backed by 3 helper factories. +``` + +- [ ] **Step 3: Commit ADR** + +```bash +git add docs/01_ADR/ADR-XXX-airflow-per-phase-dag.md +git commit -m "docs(adr): per-phase Airflow DAG decision (issue #1292)" +``` + +--- + +## Task 3: Create test scaffolding + pytest fixtures + +**Files:** +- Create: `docker/airflow/dags/tests/__init__.py` +- Create: `docker/airflow/dags/tests/conftest.py` + +- [ ] **Step 1: Create tests package marker** + +Write `docker/airflow/dags/tests/__init__.py` (empty file). + +Run: `touch docker/airflow/dags/tests/__init__.py` + +- [ ] **Step 2: Create conftest.py** + +Write `docker/airflow/dags/tests/conftest.py`: + +```python +"""Pytest fixtures for per_phase_tasks unit tests.""" +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Ensure dags/ is importable +DAGS_DIR = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(DAGS_DIR)) + + +@pytest.fixture +def mock_dag_run_conf(): + """Factory: mock context['dag_run'].conf for parse_scope.""" + + def _make(conf): + dag_run = MagicMock() + dag_run.conf = conf + return {"dag_run": dag_run} + + return _make + + +@pytest.fixture +def mock_external_api_conn(): + """Mock the Airflow 'external_api' connection to localhost.""" + with patch("per_phase_tasks.BaseHook") as base_hook: + conn = MagicMock() + conn.host = "localhost" + conn.port = 8081 + base_hook.get_connection.return_value = conn + yield base_hook +``` + +- [ ] **Step 3: Verify pytest discovery works** + +Run: `docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/ --collect-only -q"` + +Expected: `no tests ran` or `0 tests collected` (no tests yet). Confirms pytest is available. + +If scheduler container is not running: +```bash +docker compose -f docker-compose.airflow.yml up -d airflow-scheduler +``` + +- [ ] **Step 4: Commit scaffolding** + +```bash +git add docker/airflow/dags/tests/__init__.py docker/airflow/dags/tests/conftest.py +git commit -m "test(airflow): pytest scaffolding for per_phase_tasks" +``` + +--- + +## Task 4: Write failing tests for parse_scope (TDD red) + +**Files:** +- Create: `docker/airflow/dags/tests/test_per_phase_tasks.py` + +- [ ] **Step 1: Write parse_scope test file** + +Write `docker/airflow/dags/tests/test_per_phase_tasks.py`: + +```python +"""Unit tests for per_phase_tasks.parse_scope.""" +import pytest +from airflow.exceptions import AirflowException + +from per_phase_tasks import parse_scope, ALLOWED_SCOPES + + +@pytest.mark.parametrize( + "conf,expected", + [ + ({}, ["FULL_DAILY"]), + ({"scope": "FULL_DAILY"}, ["FULL_DAILY"]), + ({"scope": "ITEM_EQUIPMENT"}, ["ITEM_EQUIPMENT"]), + ( + {"scope": ["ITEM_EQUIPMENT_LOOP", "OCID_LOOKUP_STOP"]}, + ["ITEM_EQUIPMENT_LOOP", "OCID_LOOKUP_STOP"], + ), + ({"scope": ["RANKING_FETCH", "OCID_LOOKUP"]}, ["RANKING_FETCH", "OCID_LOOKUP"]), + ], +) +def test_parse_scope_valid(conf, expected): + assert parse_scope(conf) == expected + + +@pytest.mark.parametrize( + "conf", + [ + {"scope": "RANKING_FETCH_LOOP"}, # not in loopable set + {"scope": ["RANKING_FETCH_LOOP"]}, # list with invalid + {"scope": "INVALID"}, # unknown value + {"scope": ["INVALID"]}, # list with unknown + {"scope": ["ITEM_EQUIPMENT", "FOO"]}, # partial list with one invalid + ], +) +def test_parse_scope_invalid(conf): + with pytest.raises(AirflowException): + parse_scope(conf) + + +def test_allowed_scopes_constant_matches_spec(): + """Guard: if spec adds a scope value, this test forces an update.""" + expected = { + "RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT", + "OCID_LOOKUP_LOOP", "CHARACTER_BASIC_LOOP", "ITEM_EQUIPMENT_LOOP", + "RANKING_FETCH_STOP", "OCID_LOOKUP_STOP", + "CHARACTER_BASIC_STOP", "ITEM_EQUIPMENT_STOP", + } + assert set(ALLOWED_SCOPES) == expected +``` + +- [ ] **Step 2: Run tests to verify they fail (RED)** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v" +``` + +Expected: `ModuleNotFoundError: No module named 'per_phase_tasks'` (since the module doesn't exist yet) for all 7 tests. + +- [ ] **Step 3: Commit failing tests** + +```bash +git add docker/airflow/dags/tests/test_per_phase_tasks.py +git commit -m "test(airflow): failing tests for parse_scope" +``` + +--- + +## Task 5: Implement parse_scope + ALLOWED_SCOPES (TDD green) + +**Files:** +- Create: `docker/airflow/dags/per_phase_tasks.py` + +- [ ] **Step 1: Implement parse_scope and constants** + +Write the top of `docker/airflow/dags/per_phase_tasks.py`: + +```python +"""Per-phase Airflow task factories for ext-api. + +Drives the per-phase endpoints from #1289/1290/1291 via Airflow's +BranchPythonOperator in daily_collection_pipeline.py. + +Spec: docs/superpowers/specs/2026-06-18-issue-1292-per-phase-dag-design.md +""" +from datetime import timedelta +import json + +import requests +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook +from airflow.operators.python import PythonOperator +from airflow.sensors.python import PythonSensor + + +# Allowed scope values. RANKING_FETCH_LOOP intentionally excluded — ext-api +# PhaseLoopController.loopablePhases from #1291 excludes RANKING_FETCH. +ALLOWED_SCOPES = frozenset({ + "RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT", + "OCID_LOOKUP_LOOP", "CHARACTER_BASIC_LOOP", "ITEM_EQUIPMENT_LOOP", + "RANKING_FETCH_STOP", "OCID_LOOKUP_STOP", + "CHARACTER_BASIC_STOP", "ITEM_EQUIPMENT_STOP", +}) + +# Phase lists for fan-out +TRIGGER_PHASES = ["RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT"] +LOOP_PHASES = ["OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT"] +STOP_PHASES = ["RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT"] + + +def get_external_api_base() -> str: + """Resolve ext-api base URL from Airflow Connection 'external_api'.""" + conn = BaseHook.get_connection("external_api") + return f"http://{conn.host}:{conn.port}" + + +def parse_scope(conf: dict) -> list: + """Validate dag_run.conf['scope']. Returns list of scope values. + + - Missing or 'FULL_DAILY' → ['FULL_DAILY'] + - String → wrap in list + - List → validate every value against ALLOWED_SCOPES + - Any invalid value → raise AirflowException + """ + scope = conf.get("scope", "FULL_DAILY") + if scope == "FULL_DAILY": + return ["FULL_DAILY"] + if isinstance(scope, str): + scope = [scope] + if not isinstance(scope, list): + raise AirflowException( + f"scope must be string or list, got {type(scope).__name__}" + ) + invalid = [s for s in scope if s not in ALLOWED_SCOPES] + if invalid: + raise AirflowException( + f"Invalid scope values: {invalid}. " + f"Allowed: {sorted(ALLOWED_SCOPES)}" + ) + return list(scope) +``` + +- [ ] **Step 2: Run tests to verify they pass (GREEN)** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v" +``` + +Expected: all 13 tests pass (5 valid parametrize × 1 + 5 invalid parametrize × 1 + 1 constant guard + 1 of constants = 12 parametrized cases + 1 constant). + +If any fail, fix and re-run. + +- [ ] **Step 3: Commit parse_scope** + +```bash +git add docker/airflow/dags/per_phase_tasks.py +git commit -m "feat(airflow): parse_scope validator + ALLOWED_SCOPES" +``` + +--- + +## Task 6: Write failing tests for make_trigger_task (TDD red) + +**Files:** +- Modify: `docker/airflow/dags/tests/test_per_phase_tasks.py` + +- [ ] **Step 1: Append trigger factory tests** + +Append to `docker/airflow/dags/tests/test_per_phase_tasks.py`: + +```python +from unittest.mock import MagicMock, patch +from per_phase_tasks import make_trigger_task, make_loop_task, make_stop_task + + +def _make_ctx(conf, xcom_value=None): + """Build a minimal Airflow task context dict.""" + dag_run = MagicMock() + dag_run.conf = conf + ti = MagicMock() + if xcom_value is not None: + ti.xcom_pull.return_value = xcom_value + else: + ti.xcom_pull.return_value = None + return {"dag_run": dag_run, "ti": ti} + + +def test_make_trigger_task_skips_when_scope_empty(mock_external_api_conn): + """If phase not in scope, callable returns None (Airflow marks skipped).""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["OCID_LOOKUP"]}) + assert task.python_callable(**ctx) is None + + +def test_make_trigger_task_happy_path(mock_external_api_conn): + """200 → returns response JSON, no exception.""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 202 + mock_resp.json.return_value = {"runId": "r-1", "phase": "ITEM_EQUIPMENT"} + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result == {"runId": "r-1", "phase": "ITEM_EQUIPMENT"} + mock_post.assert_called_once() + assert "/trigger/phase/ITEM_EQUIPMENT" in mock_post.call_args[0][0] + + +def test_make_trigger_task_409_discovers_active_run(mock_external_api_conn): + """409 → fetch /run-status, return active runId with ALREADY_ACTIVE status.""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + + with patch("per_phase_tasks.requests.post") as mock_post, \ + patch("per_phase_tasks.requests.get") as mock_get: + trigger_resp = MagicMock() + trigger_resp.status_code = 409 + mock_post.return_value = trigger_resp + + status_resp = MagicMock() + status_resp.status_code = 200 + status_resp.json.return_value = {"current": {"runId": "r-active"}} + status_resp.raise_for_status = MagicMock() + mock_get.return_value = status_resp + + result = task.python_callable(**ctx) + + assert result == { + "runId": "r-active", + "phase": "ITEM_EQUIPMENT", + "status": "ALREADY_ACTIVE", + } + + +def test_make_trigger_task_500_raises(mock_external_api_conn): + """500 → AirflowException.""" + task = make_trigger_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 500 + mock_resp.text = "boom" + mock_post.return_value = mock_resp + + with pytest.raises(AirflowException): + task.python_callable(**ctx) +``` + +- [ ] **Step 2: Run new tests to verify they fail (RED)** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py::test_make_trigger_task_skips_when_scope_empty -v" +``` + +Expected: `ImportError: cannot import name 'make_trigger_task' from 'per_phase_tasks'`. + +- [ ] **Step 3: Commit failing tests** + +```bash +git add docker/airflow/dags/tests/test_per_phase_tasks.py +git commit -m "test(airflow): failing tests for make_trigger_task" +``` + +--- + +## Task 7: Implement make_trigger_task (TDD green) + +**Files:** +- Modify: `docker/airflow/dags/per_phase_tasks.py` + +- [ ] **Step 1: Append make_trigger_task implementation** + +Append to `docker/airflow/dags/per_phase_tasks.py`: + +```python +def make_trigger_task(phase: str) -> PythonOperator: + """Single-shot phase trigger via /trigger/phase/{phase}. + + Gates on scope: returns None (Airflow skip) if bare phase not in scope. + """ + def _trigger(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if phase not in scope: + return None + base = get_external_api_base() + try: + resp = requests.post( + f"{base}/api/internal/trigger/phase/{phase}", timeout=30 + ) + except requests.RequestException as exc: + raise AirflowException(f"Trigger {phase} failed: {exc}") from exc + + if resp.status_code in (200, 202): + return resp.json() + + if resp.status_code == 409: + try: + status_resp = requests.get( + f"{base}/api/internal/run-status", timeout=10 + ) + status_resp.raise_for_status() + data = status_resp.json() + except (requests.RequestException, ValueError) as exc: + raise AirflowException( + f"409 from trigger {phase} but /run-status fetch failed: {exc}" + ) from exc + current = data.get("current") or {} + return { + "runId": current.get("runId"), + "phase": phase, + "status": "ALREADY_ACTIVE", + } + + raise AirflowException( + f"Trigger {phase} failed: HTTP {resp.status_code} " + f"{resp.reason}: {resp.text[:500]}" + ) + + return PythonOperator( + task_id=f"per_phase_trigger_{phase.lower()}", + python_callable=_trigger, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) +``` + +- [ ] **Step 2: Run trigger tests to verify they pass (GREEN)** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v -k trigger" +``` + +Expected: all 4 trigger tests pass. + +- [ ] **Step 3: Commit make_trigger_task** + +```bash +git add docker/airflow/dags/per_phase_tasks.py +git commit -m "feat(airflow): make_trigger_task factory" +``` + +--- + +## Task 8: Write failing tests for make_loop_task (TDD red) + +**Files:** +- Modify: `docker/airflow/dags/tests/test_per_phase_tasks.py` + +- [ ] **Step 1: Append loop factory tests** + +Append to `docker/airflow/dags/tests/test_per_phase_tasks.py`: + +```python +def test_make_loop_task_skips_when_scope_empty(mock_external_api_conn): + """If {phase}_LOOP not in scope, returns None.""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) # bare, not _LOOP + assert task.python_callable(**ctx) is None + + +def test_make_loop_task_happy_path(mock_external_api_conn): + """202 → returns response JSON with loopId.""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_LOOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 202 + mock_resp.json.return_value = { + "status": "LOOP_STARTED", + "phase": "ITEM_EQUIPMENT", + "loopId": "loop-1", + "iterationCount": 0, + } + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result["loopId"] == "loop-1" + assert "/loop/phase/ITEM_EQUIPMENT" in mock_post.call_args[0][0] + + +def test_make_loop_task_409_idempotent(mock_external_api_conn): + """409 → push existing loopId with ALREADY_LOOPING status.""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_LOOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 409 + mock_resp.json.return_value = { + "status": "LOOP_ALREADY_ACTIVE", + "phase": "ITEM_EQUIPMENT", + "loopId": "loop-existing", + } + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result["loopId"] == "loop-existing" + assert result["status"] == "ALREADY_LOOPING" + + +def test_make_loop_task_400_invalid_phase_raises(mock_external_api_conn): + """400 → AirflowException (config error).""" + task = make_loop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_LOOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 400 + mock_resp.json.return_value = {"error": "INVALID_PHASE"} + mock_post.return_value = mock_resp + + with pytest.raises(AirflowException): + task.python_callable(**ctx) +``` + +- [ ] **Step 2: Run loop tests to verify they fail (RED)** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v -k loop" +``` + +Expected: `ImportError: cannot import name 'make_loop_task'`. + +- [ ] **Step 3: Commit failing tests** + +```bash +git add docker/airflow/dags/tests/test_per_phase_tasks.py +git commit -m "test(airflow): failing tests for make_loop_task" +``` + +--- + +## Task 9: Implement make_loop_task (TDD green) + +**Files:** +- Modify: `docker/airflow/dags/per_phase_tasks.py` + +- [ ] **Step 1: Append make_loop_task** + +Append to `docker/airflow/dags/per_phase_tasks.py`: + +```python +def make_loop_task(phase: str) -> PythonOperator: + """Start loop via /loop/phase/{phase}. + + Gates on scope: returns None if {phase}_LOOP not in scope. + 409 → idempotent success with ALREADY_LOOPING status. + 400 → AirflowException (config error, e.g. RANKING_FETCH_LOOP). + """ + def _loop(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if f"{phase}_LOOP" not in scope: + return None + base = get_external_api_base() + try: + resp = requests.post( + f"{base}/api/internal/loop/phase/{phase}", timeout=30 + ) + except requests.RequestException as exc: + raise AirflowException(f"Loop start {phase} failed: {exc}") from exc + + if resp.status_code == 202: + return resp.json() + + if resp.status_code == 409: + body = resp.json() + return {**body, "status": "ALREADY_LOOPING"} + + if resp.status_code == 400: + raise AirflowException( + f"Loop start {phase} rejected (INVALID_PHASE): {resp.text[:500]}" + ) + + raise AirflowException( + f"Loop start {phase} failed: HTTP {resp.status_code} " + f"{resp.reason}: {resp.text[:500]}" + ) + + return PythonOperator( + task_id=f"per_phase_loop_{phase.lower()}", + python_callable=_loop, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) +``` + +- [ ] **Step 2: Run loop tests to verify GREEN** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v -k loop" +``` + +Expected: all 4 loop tests pass. + +- [ ] **Step 3: Commit make_loop_task** + +```bash +git add docker/airflow/dags/per_phase_tasks.py +git commit -m "feat(airflow): make_loop_task factory" +``` + +--- + +## Task 10: Write failing tests for make_stop_task (TDD red) + +**Files:** +- Modify: `docker/airflow/dags/tests/test_per_phase_tasks.py` + +- [ ] **Step 1: Append stop factory tests** + +Append to `docker/airflow/dags/tests/test_per_phase_tasks.py`: + +```python +def test_make_stop_task_skips_when_scope_empty(mock_external_api_conn): + """If {phase}_STOP not in scope, returns None.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT"]}) + assert task.python_callable(**ctx) is None + + +def test_make_stop_task_happy_path_202(mock_external_api_conn): + """202 STOP_REQUESTED → returns status.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_STOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 202 + mock_resp.json.return_value = { + "status": "STOP_REQUESTED", + "phase": "ITEM_EQUIPMENT", + "loopId": "loop-1", + } + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result["status"] == "STOP_REQUESTED" + assert "/stop/phase/ITEM_EQUIPMENT" in mock_post.call_args[0][0] + + +def test_make_stop_task_200_not_running(mock_external_api_conn): + """200 NOT_RUNNING → idempotent success.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_STOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"status": "NOT_RUNNING"} + mock_post.return_value = mock_resp + + result = task.python_callable(**ctx) + + assert result == {"phase": "ITEM_EQUIPMENT", "status": "NOT_RUNNING"} + + +def test_make_stop_task_500_raises(mock_external_api_conn): + """500 → AirflowException.""" + task = make_stop_task("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["ITEM_EQUIPMENT_STOP"]}) + + with patch("per_phase_tasks.requests.post") as mock_post: + mock_resp = MagicMock() + mock_resp.status_code = 500 + mock_resp.text = "boom" + mock_post.return_value = mock_resp + + with pytest.raises(AirflowException): + task.python_callable(**ctx) +``` + +- [ ] **Step 2: Run stop tests to verify RED** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v -k stop" +``` + +Expected: `ImportError: cannot import name 'make_stop_task'`. + +- [ ] **Step 3: Commit failing tests** + +```bash +git add docker/airflow/dags/tests/test_per_phase_tasks.py +git commit -m "test(airflow): failing tests for make_stop_task" +``` + +--- + +## Task 11: Implement make_stop_task (TDD green) + +**Files:** +- Modify: `docker/airflow/dags/per_phase_tasks.py` + +- [ ] **Step 1: Append make_stop_task** + +Append to `docker/airflow/dags/per_phase_tasks.py`: + +```python +def make_stop_task(phase: str) -> PythonOperator: + """Stop via /stop/phase/{phase}. + + Single endpoint halts both single-shot runs and loops (per #1290 spec §5.3). + Gates on scope: returns None if {phase}_STOP not in scope. + """ + def _stop(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if f"{phase}_STOP" not in scope: + return None + base = get_external_api_base() + try: + resp = requests.post( + f"{base}/api/internal/stop/phase/{phase}", timeout=30 + ) + except requests.RequestException as exc: + raise AirflowException(f"Stop {phase} failed: {exc}") from exc + + if resp.status_code == 202: + return {**resp.json(), "status": "STOP_REQUESTED"} + + if resp.status_code == 200: + return {"phase": phase, "status": "NOT_RUNNING"} + + raise AirflowException( + f"Stop {phase} failed: HTTP {resp.status_code} " + f"{resp.reason}: {resp.text[:500]}" + ) + + return PythonOperator( + task_id=f"per_phase_stop_{phase.lower()}", + python_callable=_stop, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) +``` + +- [ ] **Step 2: Run stop tests to verify GREEN** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v" +``` + +Expected: all parse_scope + trigger + loop + stop tests pass (5 valid + 5 invalid + 1 constant + 4 trigger + 4 loop + 4 stop = 23 tests). + +- [ ] **Step 3: Commit make_stop_task** + +```bash +git add docker/airflow/dags/per_phase_tasks.py +git commit -m "feat(airflow): make_stop_task factory" +``` + +--- + +## Task 12: Write failing test for make_is_phase_terminal (TDD red) + +**Files:** +- Modify: `docker/airflow/dags/tests/test_per_phase_tasks.py` + +- [ ] **Step 1: Append sensor tests** + +Append to `docker/airflow/dags/tests/test_per_phase_tasks.py`: + +```python +from per_phase_tasks import make_is_phase_terminal + + +def test_sensor_returns_true_when_scope_excludes_phase(mock_external_api_conn): + """If phase not in scope, sensor returns True (skip).""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx({"scope": ["OCID_LOOKUP"]}) + assert sensor(**ctx) is True + + +def test_sensor_returns_true_when_terminal(mock_external_api_conn): + """runId matches + current.terminal=True → True.""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"runId": "r-1", "phase": "ITEM_EQUIPMENT"}, + ) + + with patch("per_phase_tasks.requests.get") as mock_get: + status_resp = MagicMock() + status_resp.status_code = 200 + status_resp.json.return_value = { + "current": { + "runId": "r-1", + "phase": "ITEM_EQUIPMENT", + "terminal": True, + } + } + status_resp.raise_for_status = MagicMock() + mock_get.return_value = status_resp + + assert sensor(**ctx) is True + + +def test_sensor_raises_when_run_failed(mock_external_api_conn): + """current.phase == FAILED → RuntimeError.""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"runId": "r-1"}, + ) + + with patch("per_phase_tasks.requests.get") as mock_get: + status_resp = MagicMock() + status_resp.status_code = 200 + status_resp.json.return_value = { + "current": { + "runId": "r-1", + "phase": "FAILED", + "errorMessage": "boom", + } + } + status_resp.raise_for_status = MagicMock() + mock_get.return_value = status_resp + + with pytest.raises(RuntimeError, match="Run r-1 failed"): + sensor(**ctx) + + +def test_sensor_returns_false_on_transient_http(mock_external_api_conn): + """Network error → False (reschedule).""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"runId": "r-1"}, + ) + + with patch("per_phase_tasks.requests.get") as mock_get: + mock_get.side_effect = requests.RequestException("connection refused") + + assert sensor(**ctx) is False + + +def test_sensor_returns_false_when_runid_mismatch(mock_external_api_conn): + """current.runId != trigger's runId → False.""" + sensor = make_is_phase_terminal("ITEM_EQUIPMENT") + ctx = _make_ctx( + {"scope": ["ITEM_EQUIPMENT"]}, + xcom_value={"runId": "r-1"}, + ) + + with patch("per_phase_tasks.requests.get") as mock_get: + status_resp = MagicMock() + status_resp.status_code = 200 + status_resp.json.return_value = { + "current": {"runId": "r-other", "phase": "ITEM_EQUIPMENT"} + } + status_resp.raise_for_status = MagicMock() + mock_get.return_value = status_resp + + assert sensor(**ctx) is False +``` + +- [ ] **Step 2: Run sensor tests to verify RED** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v -k sensor" +``` + +Expected: `ImportError: cannot import name 'make_is_phase_terminal'`. + +- [ ] **Step 3: Commit failing tests** + +```bash +git add docker/airflow/dags/tests/test_per_phase_tasks.py +git commit -m "test(airflow): failing tests for make_is_phase_terminal" +``` + +--- + +## Task 13: Implement make_is_phase_terminal (TDD green) + +**Files:** +- Modify: `docker/airflow/dags/per_phase_tasks.py` + +- [ ] **Step 1: Append make_is_phase_terminal** + +Append to `docker/airflow/dags/per_phase_tasks.py`: + +```python +def make_is_phase_terminal(phase: str): + """PythonSensor callable: returns True when triggered runId reaches terminal. + + Gates on scope first: if phase not in scope, returns True (skip). + FAILED phase → RuntimeError (hard-fail DAG). + Transient HTTP → False (reschedule). + """ + task_id = f"per_phase_trigger_{phase.lower()}" + + def _poke(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if phase not in scope: + return True + + xcom_val = ctx["ti"].xcom_pull(task_ids=task_id) + if isinstance(xcom_val, str): + xcom_val = json.loads(xcom_val) + run_id = (xcom_val or {}).get("runId") + if not run_id: + raise RuntimeError( + f"Sensor for {phase} triggered but trigger xcom returned no runId " + f"— config error" + ) + + try: + resp = requests.get( + f"{get_external_api_base()}/api/internal/run-status", + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + except (requests.RequestException, ValueError): + return False + + current = data.get("current") + if not current or current.get("runId") != run_id: + return False + + if current.get("phase") == "FAILED": + raise RuntimeError( + f"Run {run_id} failed: {current.get('errorMessage', 'unknown')}" + ) + + return bool(current.get("terminal", False)) + + return _poke +``` + +- [ ] **Step 2: Run all tests to verify GREEN** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v" +``` + +Expected: all 28 tests pass (parse_scope 11 + trigger 4 + loop 4 + stop 4 + sensor 5). + +- [ ] **Step 3: Commit make_is_phase_terminal** + +```bash +git add docker/airflow/dags/per_phase_tasks.py +git commit -m "feat(airflow): make_is_phase_terminal sensor factory" +``` + +--- + +## Task 14: Write failing DAG loader test (TDD red) + +**Files:** +- Modify: `docker/airflow/dags/tests/test_per_phase_tasks.py` + +- [ ] **Step 1: Append DAG loader tests** + +Append to `docker/airflow/dags/tests/test_per_phase_tasks.py`: + +```python +import os +from airflow.models import DagBag + + +def test_daily_collection_pipeline_parses(): + """DAG parses without import errors (regression guard for rewire).""" + dag_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + assert "daily_collection_pipeline" in dagbag.dags, ( + f"DAG missing. Errors: {dagbag.import_errors}" + ) + assert dagbag.import_errors == {}, f"Import errors: {dagbag.import_errors}" + + +def test_branch_on_scope_task_exists(): + dag_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + dag = dagbag.dags["daily_collection_pipeline"] + assert "branch_on_scope" in dag.task_ids + + +def test_all_per_phase_tasks_present(): + """11 per-phase task definitions expected.""" + dag_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + dag = dagbag.dags["daily_collection_pipeline"] + expected = { + # 4 trigger + "per_phase_trigger_ranking_fetch", + "per_phase_trigger_ocid_lookup", + "per_phase_trigger_character_basic", + "per_phase_trigger_item_equipment", + # 3 loop (RANKING_FETCH_LOOP excluded) + "per_phase_loop_ocid_lookup", + "per_phase_loop_character_basic", + "per_phase_loop_item_equipment", + # 4 stop + "per_phase_stop_ranking_fetch", + "per_phase_stop_ocid_lookup", + "per_phase_stop_character_basic", + "per_phase_stop_item_equipment", + } + assert expected.issubset(set(dag.task_ids)), ( + f"Missing tasks: {expected - set(dag.task_ids)}" + ) + + +def test_branch_downstream_includes_both_paths(): + """branch_on_scope must route to both trigger_daily_collection AND per_phase_join.""" + dag_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + dagbag = DagBag(dag_folder=dag_folder, include_examples=False) + dag = dagbag.dags["daily_collection_pipeline"] + branch_task = dag.get_task("branch_on_scope") + downstream_ids = set(branch_task.downstream_list) + assert "trigger_daily_collection" in downstream_ids + assert "per_phase_join" in downstream_ids +``` + +- [ ] **Step 2: Run DAG loader tests to verify RED** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v -k 'dag' 2>&1 | tail -30" +``` + +Expected: `dag_branch_on_scope_task_exists` fails with `KeyError: 'branch_on_scope'` (task not yet wired). + +- [ ] **Step 3: Commit failing tests** + +```bash +git add docker/airflow/dags/tests/test_per_phase_tasks.py +git commit -m "test(airflow): failing DAG loader tests for branch_on_scope" +``` + +--- + +## Task 15: Wire branch_on_scope into daily_collection_pipeline.py + +**Files:** +- Modify: `docker/airflow/dags/daily_collection_pipeline.py` + +- [ ] **Step 1: Add imports at top of file** + +Open `docker/airflow/dags/daily_collection_pipeline.py` and add these lines after the existing import block (after `from airflow.sensors.python import PythonSensor`): + +```python +from airflow.operators.empty import EmptyOperator +from airflow.operators.python import BranchPythonOperator + +from per_phase_tasks import ( + parse_scope, + make_trigger_task, + make_loop_task, + make_stop_task, + make_is_phase_terminal, + TRIGGER_PHASES, + LOOP_PHASES, + STOP_PHASES, +) +``` + +- [ ] **Step 2: Add new task definitions inside DAG context** + +Find the line `check_external_api >> trigger_daily_collection >> wait_for_completion >> wait_ie_cycle >> trigger_cleanup` at the end of the DAG context (around line 238). Insert **before** that line: + +```python + # Per-phase branch (issue #1292). Activated when dag_run.conf['scope'] + # is set to a list of action values (see per_phase_tasks.ALLOWED_SCOPES). + branch_on_scope = BranchPythonOperator( + task_id="branch_on_scope", + python_callable=lambda **ctx: ( + "trigger_daily_collection" + if parse_scope(ctx["dag_run"].conf or {}) == ["FULL_DAILY"] + else "per_phase_join" + ), + ) + + per_phase_join = EmptyOperator( + task_id="per_phase_join", + trigger_rule="none_failed_min_one_success", + ) + + per_phase_trigger_tasks = [make_trigger_task(p) for p in TRIGGER_PHASES] + per_phase_loop_tasks = [make_loop_task(p) for p in LOOP_PHASES] + per_phase_stop_tasks = [make_stop_task(p) for p in STOP_PHASES] + + per_phase_trigger_sensors = [ + PythonSensor( + task_id=f"per_phase_wait_{p.lower()}_completion", + python_callable=make_is_phase_terminal(p), + mode="reschedule", + poke_interval=60, + timeout=60 * 60 * 4, + ) + for p in TRIGGER_PHASES + ] +``` + +- [ ] **Step 3: Rewire the root + add per-phase edges** + +Replace the final line `check_external_api >> trigger_daily_collection >> ...` with: + +```python + check_external_api >> branch_on_scope + branch_on_scope >> trigger_daily_collection + branch_on_scope >> per_phase_join + per_phase_join >> per_phase_trigger_tasks >> per_phase_trigger_sensors + per_phase_join >> per_phase_loop_tasks + per_phase_join >> per_phase_stop_tasks + trigger_daily_collection >> wait_for_completion >> wait_ie_cycle >> trigger_cleanup +``` + +- [ ] **Step 4: Run DAG loader tests to verify GREEN** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v -k dag" +``` + +Expected: all 4 DAG loader tests pass. + +- [ ] **Step 5: Run full test suite** + +Run: +```bash +docker exec maple-airflow-scheduler bash -c "cd /opt/airflow/dags && python -m pytest tests/test_per_phase_tasks.py -v" +``` + +Expected: all 32 tests pass (28 unit + 4 DAG loader). + +- [ ] **Step 6: Verify DAG parses in scheduler** + +```bash +docker exec maple-airflow-scheduler airflow dags list | grep daily_collection_pipeline +``` + +Expected: `daily_collection_pipeline` listed. + +- [ ] **Step 7: Verify all 11 per-phase tasks visible** + +```bash +docker exec maple-airflow-scheduler airflow tasks list daily_collection_pipeline | grep per_phase +``` + +Expected: 11 lines (4 trigger + 3 loop + 4 stop). + +- [ ] **Step 8: Commit DAG wiring** + +```bash +git add docker/airflow/dags/daily_collection_pipeline.py +git commit -m "feat(airflow): wire branch_on_scope + per-phase fan-out into daily DAG" +``` + +--- + +## Task 16: Manual smoke test via pipeline-test skill + +**Files:** none (runtime verification only) + +- [ ] **Step 1: Confirm module-external-api is running locally** + +Run: `curl -s http://localhost:8081/actuator/health | jq '.status'` +Expected: `"UP"` + +If DOWN: `./gradlew :module-external-api:bootRun` (background). + +- [ ] **Step 2: Confirm airflow scheduler is running** + +Run: `docker exec maple-airflow-scheduler airflow dags list | grep daily_collection_pipeline` +Expected: DAG listed. + +If not running: `docker compose -f docker-compose.airflow.yml up -d`. + +- [ ] **Step 3: Test FULL_DAILY path (default)** + +```bash +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline +``` + +Expected: DAG run created, executes existing daily chain (4 phases + cleanup). Verify via `airflow dags list-runs daily_collection_pipeline`. + +- [ ] **Step 4: Test single-phase trigger scope** + +```bash +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["ITEM_EQUIPMENT"]}' +``` + +Expected: only `per_phase_trigger_item_equipment` + `per_phase_wait_item_equipment_completion` execute. Other 10 per-phase tasks skipped (their callables return None). + +Verify via: +```bash +curl -s http://localhost:8081/api/internal/run-status | jq '.current.phase' +``` +Expected: `"ITEM_EQUIPMENT"` (within 5s). + +- [ ] **Step 5: Test loop scope** + +```bash +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["ITEM_EQUIPMENT_LOOP"]}' +``` + +Expected: `per_phase_loop_item_equipment` succeeds, returns 202 with loopId. + +Verify after 60s: +```bash +curl -s http://localhost:8081/api/internal/run-status | jq '.loopSummaries.ITEM_EQUIPMENT.iterationCount' +``` +Expected: integer > 0. + +- [ ] **Step 6: Test stop scope** + +```bash +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["ITEM_EQUIPMENT_STOP"]}' +``` + +Expected: `per_phase_stop_item_equipment` returns 202 STOP_REQUESTED. + +Verify within 30s: +```bash +curl -s http://localhost:8081/api/internal/run-status | jq '.loopSummaries.ITEM_EQUIPMENT.status' +``` +Expected: `"STOPPED"`. + +- [ ] **Step 7: Test invalid scope fails fast** + +```bash +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["RANKING_FETCH_LOOP"]}' +``` + +Expected: DAG run fails immediately with `AirflowException: Invalid scope values: ['RANKING_FETCH_LOOP']`. + +- [ ] **Step 8: Test multi-action scope** + +```bash +docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \ + -c '{"scope": ["ITEM_EQUIPMENT_LOOP", "OCID_LOOKUP_STOP"]}' +``` + +Expected: both `per_phase_loop_item_equipment` + `per_phase_stop_ocid_lookup` execute in parallel. + +- [ ] **Step 9: Document smoke test results** + +Append to `docs/01_ADR/ADR-XXX-airflow-per-phase-dag.md` §4 Result / Evidence table: + +```markdown +### Observed Result + +* FULL_DAILY scope: existing daily chain ran unchanged (4 phases + cleanup completed) +* ITEM_EQUIPMENT scope: triggered only ITEM_EQUIPMENT; /run-status showed ACTIVE within 5s +* ITEM_EQUIPMENT_LOOP scope: loopId returned; iterationCount > 0 after 60s +* ITEM_EQUIPMENT_STOP scope: loop status transitioned to STOPPED within 30s +* Invalid scope (RANKING_FETCH_LOOP): DAG run failed fast with AirflowException +* Multi-action scope: parallel execution confirmed via task log +``` + +- [ ] **Step 10: Commit smoke test results** + +```bash +git add docs/01_ADR/ADR-XXX-airflow-per-phase-dag.md +git commit -m "docs(adr): record manual smoke test results for #1292" +``` + +--- + +## Task 17: Create PR + +**Files:** none + +- [ ] **Step 1: Push branch** + +```bash +git push origin feature/issue-1292-per-phase-dag +``` + +- [ ] **Step 2: Open PR to develop** + +```bash +gh pr create --base develop --head feature/issue-1292-per-phase-dag \ + --title "feat(airflow): per-phase scope-driven branch in daily DAG (issue #1292)" \ + --body "$(cat <<'EOF' +Adds per-phase scope-driven branch to daily_collection_pipeline.py. Operators can trigger, loop, or stop a single ext-api phase via `airflow dags trigger -c '{"scope": ["..."]}'` without running the full daily chain. + +New helper module `per_phase_tasks.py` exposes 3 task factories (trigger/loop/stop) + parse_scope validator + phase-filtered sensor factory. 11 per-phase task definitions fan out from a BranchPythonOperator. + +Acceptance criteria all covered via unit tests + manual smoke test against local stack. + +🤖 Generated with [Claude Code](https://claude.com/claude-code) +EOF +)" +``` + +- [ ] **Step 3: Verify PR created** + +Run: `gh pr list --head feature/issue-1292-per-phase-dag` +Expected: PR shown with `develop` base. + +--- + +## Self-Review Checklist (run after writing plan) + +- [ ] **Spec coverage**: §3.2 (allowed values) → Task 5; §4.1 (make_trigger_task) → Tasks 6+7; §4.2 (make_loop_task) → Tasks 8+9; §4.3 (make_stop_task) → Tasks 10+11; §4.4 (make_is_phase_terminal) → Tasks 12+13; §4.5 (DAG wiring) → Tasks 14+15; §5 (error handling) → covered in each factory's tests; §6 (test plan) → Tasks 3-15; §7 (AC mapping) → Tasks 16+17. +- [ ] **Placeholder scan**: no "TBD" / "TODO" / "fill in details" anywhere. +- [ ] **Type consistency**: `parse_scope` returns `list` everywhere; `make_trigger_task(phase: str)` matches across tests + impl + DAG wiring; `ALLOWED_SCOPES` defined once in Task 5, referenced in tests + impl. +- [ ] **Commit granularity**: 17 commits across 17 tasks — each task ends with `git commit`. \ No newline at end of file diff --git a/docs/superpowers/specs/2026-06-18-issue-1292-per-phase-dag-design.md b/docs/superpowers/specs/2026-06-18-issue-1292-per-phase-dag-design.md new file mode 100644 index 000000000..f5b4f9fba --- /dev/null +++ b/docs/superpowers/specs/2026-06-18-issue-1292-per-phase-dag-design.md @@ -0,0 +1,395 @@ +# Issue #1292: Airflow Per-Phase DAG — Design + +- Date: 2026-06-18 +- Branch: `feature/issue-1292-per-phase-dag` +- Status: Draft (pending user review) +- Blocked-by: #1289 (merged), #1290 (merged), #1291 (merged) +- Shape: A (extend `daily_collection_pipeline.py`) — per issue clarification + +--- + +## 1. Goal + +Extend the Airflow control plane so operators can trigger / loop / stop a single phase of `module-external-api` from the Airflow UI / CLI without running the full daily pipeline. Operators pass a `scope` list via `dag_run.conf`; the DAG routes to either the existing daily chain or a per-phase fan-out depending on `scope`. + +**Use cases** +- Hot-loop `ITEM_EQUIPMENT` to pick up fresh gear data without waiting for the daily 18:00 UTC trigger. +- Stop a runaway `CHARACTER_BASIC_LOOP` / `ITEM_EQUIPMENT_LOOP` without restarting ext-api. +- Run a single phase (e.g. `CHARACTER_BASIC`) ad-hoc after a config change. + +--- + +## 2. Components + +### 2.1 `per_phase_tasks.py` (new file) + +Pure helpers — no DAG object. Three factories + one parser: + +| Symbol | Purpose | +|--------|---------| +| `parse_scope(conf: dict) -> list[str]` | Validates `dag_run.conf['scope']`. Returns `["FULL_DAILY"]` for missing / default. Raises `AirflowException` on invalid scope values. | +| `make_trigger_task(phase: str) -> PythonOperator` | Single-shot phase trigger via `/trigger/phase/{phase}`. 409 → idempotent. | +| `make_loop_task(phase: str) -> PythonOperator` | Start loop via `/loop/phase/{phase}`. 409 → idempotent. 400 INVALID_PHASE → fail. | +| `make_stop_task(phase: str) -> PythonOperator` | Stop via `/stop/phase/{phase}`. 200 NOT_RUNNING → idempotent success. | +| `make_is_phase_terminal(phase: str) -> Callable` | PythonSensor callable — same logic as existing `_is_run_terminal` but phase-filtered. | +| `TRIGGER_PHASES = ["RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT"]` | Frozen list. | +| `LOOP_PHASES = ["CHARACTER_BASIC", "ITEM_EQUIPMENT"]` | Matches ext-api `loopablePhases` (verified 2026-06-18 against `PhaseLoopController.kt`). | +| `STOP_PHASES = ["RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT"]` | All 4 phases; single-phase stop endpoint from #1290 halts loops too. | + +### 2.2 `daily_collection_pipeline.py` (extend) + +Add branch routing + per-phase fan-out: + +``` +check_external_api + └── branch_on_scope (BranchPythonOperator) + ├── trigger_daily_collection → ... → trigger_cleanup (FULL_DAILY path; unchanged) + └── per_phase_join (EmptyOperator, trigger_rule=none_failed_min_one_success) + ├── per_phase_trigger_ (×4) → per_phase_sensor_ (×4) + ├── per_phase_loop_ (×3) → (no sensor; fire-and-forget) + └── per_phase_stop_ (×4) → (no sensor; fire-and-forget) +``` + +10 per-phase tasks materialized in graph: 4 trigger (`RANKING_FETCH`, `OCID_LOOKUP`, `CHARACTER_BASIC`, `ITEM_EQUIPMENT`) + 2 loop (`CHARACTER_BASIC_LOOP`, `ITEM_EQUIPMENT_LOOP`; OCID_LOOKUP_LOOP rejected by ext-api despite earlier spec mention) + 4 stop (`RANKING_FETCH_STOP`, `OCID_LOOKUP_STOP`, `CHARACTER_BASIC_STOP`, `ITEM_EQUIPMENT_STOP`). Per-task callable gates execution on `dag_run.conf['scope']`. + +### 2.3 Routing rule + +```python +def route_scope(**ctx) -> str: + scope = parse_scope(ctx["dag_run"].conf or {}) + return "trigger_daily_collection" if scope == ["FULL_DAILY"] else "per_phase_join" +``` + +`trigger_daily_collection` and `per_phase_join` are mutually exclusive downstream of `branch_on_scope`. + +--- + +## 3. Scope config schema + +### 3.1 Allowed scope values + +```python +ALLOWED_SCOPES = frozenset({ + # bare phase → single-shot trigger + "RANKING_FETCH", "OCID_LOOKUP", "CHARACTER_BASIC", "ITEM_EQUIPMENT", + # _LOOP suffix → start loop (only ext-api loopablePhases) + "CHARACTER_BASIC_LOOP", "ITEM_EQUIPMENT_LOOP", + # _STOP suffix → graceful stop + "RANKING_FETCH_STOP", "OCID_LOOKUP_STOP", "CHARACTER_BASIC_STOP", "ITEM_EQUIPMENT_STOP", +}) +``` + +Total 10 valid scope values (4 + 2 + 4). + +### 3.2 Examples + +| `dag_run.conf` | Behavior | +|----------------|----------| +| `{}` (no conf) | `FULL_DAILY` → existing daily chain | +| `{"scope": "FULL_DAILY"}` | explicit full daily | +| `{"scope": ["ITEM_EQUIPMENT"]}` | trigger ITEM_EQUIPMENT only | +| `{"scope": ["ITEM_EQUIPMENT_LOOP"]}` | start ITEM_EQUIPMENT loop | +| `{"scope": ["ITEM_EQUIPMENT_STOP"]}` | stop ITEM_EQUIPMENT (loop or single) | +| `{"scope": ["RANKING_FETCH", "OCID_LOOKUP"]}` | trigger 2 phases in parallel | +| `{"scope": ["ITEM_EQUIPMENT_LOOP", "OCID_LOOKUP_STOP"]}` | mixed actions in parallel | +| `{"scope": "INVALID"}` | raise `AirflowException` | +| `{"scope": ["RANKING_FETCH_LOOP"]}` | raise `AirflowException` (not in ext-api `loopablePhases`) | +| `{"scope": ["OCID_LOOKUP_LOOP"]}` | raise `AirflowException` (also not in ext-api `loopablePhases` despite spec §2.3 mention) | + +--- + +## 4. Per-phase task factories + +### 4.1 `make_trigger_task(phase)` + +```python +def make_trigger_task(phase: str) -> PythonOperator: + def _trigger(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + bare = [s for s in scope if s == phase] # gate + if not bare: + return None # Airflow marks as success-skipped + base = get_external_api_base() + try: + resp = requests.post(f"{base}/api/internal/trigger/phase/{phase}", timeout=30) + except requests.RequestException as exc: + raise AirflowException(f"Trigger {phase} failed: {exc}") from exc + + if resp.status_code in (200, 202): + return resp.json() + if resp.status_code == 409: + # idempotent: discover active runId + status_resp = requests.get(f"{base}/api/internal/run-status", timeout=10) + status_resp.raise_for_status() + current = status_resp.json().get("current") or {} + return {"runId": current.get("runId"), "phase": phase, "status": "ALREADY_ACTIVE"} + raise AirflowException( + f"Trigger {phase} failed: HTTP {resp.status_code} {resp.text[:500]}" + ) + return PythonOperator( + task_id=f"per_phase_trigger_{phase.lower()}", + python_callable=_trigger, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) +``` + +### 4.2 `make_loop_task(phase)` + +```python +def make_loop_task(phase: str) -> PythonOperator: + def _loop(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if f"{phase}_LOOP" not in scope: + return None + base = get_external_api_base() + try: + resp = requests.post(f"{base}/api/internal/loop/phase/{phase}", timeout=30) + except requests.RequestException as exc: + raise AirflowException(f"Loop start {phase} failed: {exc}") from exc + + if resp.status_code == 202: + return resp.json() + if resp.status_code == 409: + return {**resp.json(), "status": "ALREADY_LOOPING"} + if resp.status_code == 400: + raise AirflowException( + f"Loop start {phase} rejected: {resp.json().get('error')}" + ) # config error — fail task + raise AirflowException( + f"Loop start {phase} failed: HTTP {resp.status_code} {resp.text[:500]}" + ) + return PythonOperator( + task_id=f"per_phase_loop_{phase.lower()}", + python_callable=_loop, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) +``` + +### 4.3 `make_stop_task(phase)` + +```python +def make_stop_task(phase: str) -> PythonOperator: + def _stop(**ctx): + scope = parse_scope(ctx["dag_run"].conf or {}) + if f"{phase}_STOP" not in scope: + return None + base = get_external_api_base() + try: + resp = requests.post(f"{base}/api/internal/stop/phase/{phase}", timeout=30) + except requests.RequestException as exc: + raise AirflowException(f"Stop {phase} failed: {exc}") from exc + + if resp.status_code == 202: + return {**resp.json(), "status": "STOP_REQUESTED"} + if resp.status_code == 200: + return {"phase": phase, "status": "NOT_RUNNING"} + raise AirflowException( + f"Stop {phase} failed: HTTP {resp.status_code} {resp.text[:500]}" + ) + return PythonOperator( + task_id=f"per_phase_stop_{phase.lower()}", + python_callable=_stop, + retries=0, + execution_timeout=timedelta(seconds=60), + do_xcom_push=True, + ) +``` + +### 4.4 `make_is_phase_terminal(phase)` + +Reuses existing `_is_run_terminal` logic with phase filter: + +```python +def make_is_phase_terminal(phase: str): + def _poke(**ctx): + # Gate: if trigger was skipped (scope didn't include bare phase), succeed immediately + scope = parse_scope(ctx["dag_run"].conf or {}) + if phase not in scope: + return True + trigger_resp = ctx["ti"].xcom_pull(task_ids=f"per_phase_trigger_{phase.lower()}") + if isinstance(trigger_resp, str): + trigger_resp = json.loads(trigger_resp) + run_id = trigger_resp.get("runId") if trigger_resp else None + if not run_id: + raise RuntimeError( + f"Sensor for {phase} triggered but trigger xcom returned no runId — config error" + ) + try: + resp = requests.get( + f"{get_external_api_base()}/api/internal/run-status", timeout=10 + ) + resp.raise_for_status() + data = resp.json() + except (requests.RequestException, ValueError): + return False + current = data.get("current") + if not current or current.get("runId") != run_id: + return False + if current.get("phase") == "FAILED": + raise RuntimeError(f"Run {run_id} failed: {current.get('errorMessage', 'unknown')}") + return bool(current.get("terminal", False)) + return _poke +``` + +### 4.5 DAG wiring patch in `daily_collection_pipeline.py` + +```python +from docker.airflow.dags.per_phase_tasks import ( + parse_scope, make_trigger_task, make_loop_task, make_stop_task, + make_is_phase_terminal, TRIGGER_PHASES, LOOP_PHASES, STOP_PHASES, +) +from airflow.operators.empty import EmptyOperator +from airflow.operators.python import BranchPythonOperator + +# After existing chain definitions: + +branch_on_scope = BranchPythonOperator( + task_id="branch_on_scope", + python_callable=lambda **ctx: ( + "trigger_daily_collection" + if parse_scope(ctx["dag_run"].conf or {}) == ["FULL_DAILY"] + else "per_phase_join" + ), +) + +per_phase_join = EmptyOperator( + task_id="per_phase_join", + trigger_rule="none_failed_min_one_success", +) + +per_phase_trigger_tasks = [make_trigger_task(p) for p in TRIGGER_PHASES] +per_phase_loop_tasks = [make_loop_task(p) for p in LOOP_PHASES] +per_phase_stop_tasks = [make_stop_task(p) for p in STOP_PHASES] + +per_phase_trigger_sensors = [ + PythonSensor( + task_id=f"per_phase_wait_{p.lower()}_completion", + python_callable=make_is_phase_terminal(p), + mode="reschedule", + poke_interval=60, + timeout=60*60*4, + ) for p in TRIGGER_PHASES +] + +# Rewire existing root: +# check_external_api >> trigger_daily_collection >> ... (no change to existing edges) +check_external_api >> branch_on_scope +branch_on_scope >> trigger_daily_collection # FULL_DAILY path (existing chain) +branch_on_scope >> per_phase_join # scope path +per_phase_join >> per_phase_trigger_tasks >> per_phase_trigger_sensors +per_phase_join >> per_phase_loop_tasks +per_phase_join >> per_phase_stop_tasks +``` + +Total task count in DAG graph: 4 trigger + 4 sensor + 2 loop + 4 stop + 1 branch + 1 join = 16 new task definitions added. + +--- + +## 5. Error handling matrix + +| Endpoint | 2xx | 409 | 400 | Other | +|----------|-----|-----|-----|-------| +| `/trigger/phase/...` | xcom push + sensor wait | idempotent: discover active runId | n/a (phase validated upstream) | `AirflowException` | +| `/loop/phase/...` | xcom push loopId | idempotent: push existing loopId | config error: `AirflowException` | `AirflowException` | +| `/stop/phase/...` | xcom push status | n/a | n/a | `AirflowException` | +| Sensor: `GET /run-status` | match runId → terminal check | n/a | n/a | transient: return False (reschedule) | +| Sensor: FAILED phase | raise `RuntimeError` (hard fail DAG) | n/a | n/a | n/a | + +Network errors → `AirflowException` (transient: task retries per Airflow defaults; sensor retries infinitely until 4h timeout). + +--- + +## 6. Test plan + +### 6.1 Unit tests (`docker/airflow/dags/tests/test_per_phase_tasks.py`) + +**Pure-function tests (no Airflow runtime needed):** +1. `parse_scope({})` → `["FULL_DAILY"]` +2. `parse_scope({"scope": "FULL_DAILY"})` → `["FULL_DAILY"]` +3. `parse_scope({"scope": "ITEM_EQUIPMENT"})` → `["ITEM_EQUIPMENT"]` +4. `parse_scope({"scope": ["ITEM_EQUIPMENT_LOOP", "OCID_LOOKUP_STOP"]})` → 2-element list +5. `parse_scope({"scope": ["RANKING_FETCH_LOOP"]})` → raises `AirflowException` +6. `parse_scope({"scope": ["INVALID"]})` → raises `AirflowException` +7. `parse_scope({"scope": "RANKING_FETCH_LOOP"})` → raises `AirflowException` + +**DAG loader tests** (using `airflow.models.DagBag`): +8. `daily_collection_pipeline` DAG parses without error after changes +9. `branch_on_scope` task exists; downstream task_ids include `trigger_daily_collection` AND `per_phase_join` +10. All 11 per-phase task definitions exist (4 trigger + 3 loop + 4 stop) +11. `parse_scope` coverage in pytest parametrize + +### 6.2 DAG import smoke (CI gate) + +```python +def test_dag_imports(): + from airflow.models import DagBag + dagbag = DagBag(dag_folder="docker/airflow/dags/", include_examples=False) + assert "daily_collection_pipeline" in dagbag.dags + assert dagbag.import_errors == {} +``` + +### 6.3 Manual smoke (out-of-band, uses `pipeline-test` skill) + +Verification harness: existing `pipeline-test` skill (end-to-end pipeline runtime test). Run against local stack: + +| Scope | Verify | +|-------|--------| +| `["ITEM_EQUIPMENT"]` | ext-api `/run-status` shows ITEM_EQUIPMENT ACTIVE; sensor pokes until terminal | +| `["ITEM_EQUIPMENT_LOOP"]` | ext-api `/run-status.loopSummaries["ITEM_EQUIPMENT"].loopId` set; `iterationCount > 0` after 60s | +| `["ITEM_EQUIPMENT_STOP"]` | existing loop terminates within 30s; `loopSummaries["ITEM_EQUIPMENT"].status == "STOPPED"` | +| absent (FULL_DAILY) | existing daily chain runs unchanged (4 phases + cleanup) | + +--- + +## 7. Acceptance criteria mapping + +| AC | Section | +|----|---------| +| `airflow dags trigger per_phase_pipeline -c '{"scope":"ITEM_EQUIPMENT"}'` triggers only ITEM_EQUIPMENT on ext-api | §4.1 + §4.5 branch + gate | +| Same for `ITEM_EQUIPMENT_LOOP` starts loop | §4.2 + §4.5 | +| Same for `ITEM_EQUIPMENT_STOP` stops loop | §4.3 + §4.5 | +| Sensor layer correlates runId / loopId and reaches terminal state | §4.4 xcom push + sensor | +| 409 / NOT_RUNNING surfaces as Airflow task success (idempotent) | §5 error matrix | +| Manual smoke test against local stack for each scope value | §6.3 pipeline-test skill | +| Existing `daily_collection_pipeline` runs unchanged | §4.5 default `FULL_DAILY` routing + unchanged edges | + +--- + +## 8. Out-of-scope + +- Cross-phase sequential ordering (multi-action scope runs in parallel) +- Per-phase timeout overrides (default 4h sensor, 60s callable timeouts) +- Loop iteration timeout (same 4h sensor applies; operators restart if stuck) +- Auto-retry on 5xx beyond Airflow defaults +- Loop restart on iteration failure (per #1291 §11, iteration fail stops loop) +- Cross-restart loop state recovery (in-memory only, per #1291 §13) +- OAuth / API-key auth (matches existing pattern via Airflow connection) + +--- + +## 9. Risks + +| Risk | Mitigation | +|------|------------| +| Loop + multi-action: 2+ loops at once, contention. | Operators self-coordinate via scope config. Documented in DAG docstring. | +| BranchPythonOperator misroutes when `dag_run.conf` malformed. | `parse_scope` raises `AirflowException` immediately; fail-fast. | +| 17 task definitions materialized even when unused → cluttered graph view. | Per-task gating keeps behavior correct; graph view is documentation cost only. | +| 4h sensor timeout too short for cold-cache OCID_LOOKUP. | Matches existing pattern; operator override via `execution_timeout`. | +| `RANKING_FETCH_LOOP` invalid scope only caught at task-run, not DAG-parse. | Acceptable: validation at runtime before HTTP call; faster fail than DAG rejection. | +| 11 task definitions exceed CLAUDE.md 300-line coordinator limit? | Each per-phase task is small (~30 lines); main DAG file grows by ~60 lines only. | + +--- + +## 10. Files touched + +| File | Change type | Lines (est.) | +|------|-------------|--------------| +| `docker/airflow/dags/per_phase_tasks.py` | new | ~250 | +| `docker/airflow/dags/daily_collection_pipeline.py` | extend | +60 | +| `docker/airflow/dags/tests/test_per_phase_tasks.py` | new | ~120 | + +Total: ~430 lines. \ No newline at end of file