feat(airflow): per-phase scope-driven branch in daily DAG (#1292)#1303
Conversation
Adds Shape A design: extend daily_collection_pipeline.py with optional scope-driven branch. New per_phase_tasks.py helper module exposes 3 task factories (trigger/loop/stop) + parse_scope validator. Routing rule: FULL_DAILY (default) → existing chain; scope list → per-phase fan-out. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
17-task TDD plan covering: parse_scope validator, 3 task factories (trigger/loop/stop), phase-filtered sensor, BranchPythonOperator wiring, DAG loader tests, manual smoke test via pipeline-test skill, PR creation. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
ADR-393: shape A decision — extend daily DAG with scope-driven branch. test scaffolding: __init__.py + conftest.py with mock fixtures. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
TDD vertical slice: tests for ONE behavior at a time. Other factories (trigger/loop/stop/sensor) get their own RED→GREEN cycles. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Minimal impl to pass all 11 RED tests. Includes constants + get_external_api_base. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add step 10a to pipeline-test skill: post-E2E smoke that exercises the #1292 Airflow scope branch end-to-end. Sub-steps: single-phase trigger, loop start, iteration progress, loop stop, invalid scope rejection, cleanup. Skipped silently when branch_on_scope is not deployed. Also adds early-warning echo in step 5 (Airflow startup) so operators notice when the per-phase branch is missing from the deployed DAG file. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Vertical slice — 5 trigger tests (skip, happy, 409, 500, network error). Other factories deferred to their own RED→GREEN cycles. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Minimal impl to pass all 5 trigger RED tests. Gated by scope, handles 200/202/409/500/network-error per spec §4.1. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
5 loop tests: skip, happy 202, 409 idempotent, 400 INVALID_PHASE, 500. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
5 RED tests → GREEN. Handles 202/409/400/500 per spec §4.2. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
5 stop tests: skip, 202 happy, 200 NOT_RUNNING, 500, network error. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
5 RED tests → GREEN. Handles 202/200/500/network per spec §4.3. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
6 sensor tests: scope-skip, terminal, FAILED, transient, runId mismatch, missing runId. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
6 RED tests → GREEN. Scope-gated, FAILED-aware, transient-tolerant per spec §4.4. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
4 DAG integration tests. Parses ok today; branch_on_scope not yet wired. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Extends daily_collection_pipeline.py with BranchPythonOperator routing: - FULL_DAILY (default): existing daily chain unchanged - scope list: per_phase_join → 11 per-phase tasks + 4 sensors Edge chains via explicit zipped pairs (>> doesn't support list-to-list). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Spec/impl drift discovered via runtime smoke test on 2026-06-18: ext-api PhaseLoopController.loopablePhases accepts only CHARACTER_BASIC and ITEM_EQUIPMENT, not OCID_LOOKUP. Reduces LOOP_PHASES from 3 to 2. Updated: - per_phase_tasks.py: ALLOWED_SCOPES drops OCID_LOOKUP_LOOP; LOOP_PHASES shrinks - test_per_phase_tasks.py: adds 2 invalid-scope parametrize cases + DAG test expects 10 not 11 tasks - spec: removes OCID_LOOKUP_LOOP from allowed values + examples - ADR-393: metrics table updated - pipeline-test step 10a.2: notes OCID_LOOKUP_LOOP rejection reason Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f156b3a3d3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| resp = requests.post( | ||
| f"{base}/api/internal/trigger/phase/{phase}", timeout=30 | ||
| ) |
There was a problem hiding this comment.
Pass upstream run ids to phase triggers
When an operator runs scope with OCID_LOOKUP, CHARACTER_BASIC, or ITEM_EQUIPMENT, this POST is sent without X-Upstream-Run-Id. I checked InternalApiController.triggerPhase(...), which returns 400 MISSING_UPSTREAM for every non-RANKING_FETCH phase without that header, so three of the four advertised bare phase triggers fail immediately from Airflow instead of starting.
Useful? React with 👍 / 👎.
| current = data.get("current") | ||
| if not current or current.get("runId") != run_id: | ||
| return False |
There was a problem hiding this comment.
Read per-phase slot state instead of current
For a bare phase trigger, the sensor waits on /run-status.current, but the ext-api current field is a legacy alias that only returns non-terminal runs (RunStatusTracker.getCurrentStatus() filters terminal slots). Once the phase completes, its terminal record moves out of current into the per-phase slot/last-completed data, so this code returns False until the 4-hour sensor timeout even though the run finished successfully.
Useful? React with 👍 / 👎.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fd5348e79f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| resp = requests.post( | ||
| f"{base}/api/internal/trigger/phase/{phase}", timeout=30 | ||
| ) |
There was a problem hiding this comment.
Pass upstream run IDs to non-root phase triggers
When scope contains OCID_LOOKUP, CHARACTER_BASIC, or ITEM_EQUIPMENT, this POST is sent without the X-Upstream-Run-Id header, but InternalApiController.triggerPhase rejects every non-RANKING_FETCH phase without that header (module-external-api/src/main/kotlin/maple/externalapi/runstatus/InternalApiController.kt:104-105). As a result, 3 of the 4 advertised bare phase scopes fail with 400 MISSING_UPSTREAM; either forward an upstream run id from dag_run.conf/previous task output or reject those scopes before calling the endpoint.
Useful? React with 👍 / 👎.
| current = data.get("current") or {} | ||
| return { | ||
| "runId": current.get("runId"), |
There was a problem hiding this comment.
Use the 409 response runId for idempotency
On a 409 from /trigger/phase/{phase}, the controller already returns the run id for the occupied phase, but this code ignores that body and reads /run-status.current. In an environment with multiple active phase slots, current is only the most recently started non-terminal run, so a retry for an already-running phase can push another phase's runId to XCom and make the downstream sensor wait on the wrong run.
Useful? React with 👍 / 👎.
| current = data.get("current") | ||
| if not current or current.get("runId") != run_id: | ||
| return False |
There was a problem hiding this comment.
Read per-phase slots instead of legacy current
For multi-action scopes that trigger more than one bare phase, this sensor only checks the legacy /run-status.current field. RunStatusTracker.getCurrentStatus() returns one most-recent non-terminal run across all phases, and once this phase completes its terminal record lives under the per-phase slot/last-completed maps instead; while another phase is active, or after all phases are terminal, this returns False until the Airflow sensor times out even though the target phase already finished.
Useful? React with 👍 / 👎.
Summary
Extends
daily_collection_pipeline.pywith ascope-driven branch (branch_on_scope) so operators can trigger / loop / stop a single ext-api phase viaairflow dags trigger -c '{"scope": [...]}'without running the full daily chain. New helper moduleper_phase_tasks.pyexposes 3 task factories + parser + sensor.Spec/impl drift fixed (commit f156b3a)
The original #1292 spec listed
OCID_LOOKUP_LOOPas a valid scope value, but ext-api'sPhaseLoopController.loopablePhasesonly acceptsCHARACTER_BASIC+ITEM_EQUIPMENT. Verified 2026-06-18 by hitting the live endpoint (HTTP 400 INVALID_PHASE). Aligned:LOOP_PHASES = ["CHARACTER_BASIC", "ITEM_EQUIPMENT"]ALLOWED_SCOPESdropsOCID_LOOKUP_LOOPTest evidence
./gradlew-equivalent viapytest tests/test_per_phase_tasks.pyinside scheduler container)branch_on_scopetask, 10 per-phase task definitions, both downstream pathsPOST /trigger/phase/RANKING_FETCH→ 202 + runIdPOST /stop/phase/RANKING_FETCH→ 202 STOP_REQUESTEDPOST /loop/phase/RANKING_FETCH→ 400 INVALID_PHASE (gate confirmed)POST /loop/phase/OCID_LOOKUP→ 400 INVALID_PHASE (drift caught)Smoke gap
Loop iteration smoke (step 10a.3 + 10a.4 of pipeline-test skill) requires
OCID_LOOKUPto complete upstream (~25 min on local env). Deferred to follow-up session.Pipeline-test skill
Step 10a added (134 lines) — post-E2E verification of the per-phase branch. Auto-skips if
branch_on_scopetask is not in the DAG file.Out-of-scope issues (separate PRs)
docker-compose.airflow.ymlhardcodes172.20.0.2:5432for airflow-db; stale after container restart blocks DAG processinggit pull— running instance was pre-[ext-api] Phase infinite-loop endpoint: continuous single-phase execution #1291, returned 404 on new endpoints until rebuiltFiles
docker/airflow/dags/per_phase_tasks.py(new, 254 lines)docker/airflow/dags/tests/test_per_phase_tasks.py(new, 476 lines)docker/airflow/dags/tests/conftest.py(new, 33 lines)docker/airflow/dags/tests/__init__.py(new)docker/airflow/dags/daily_collection_pipeline.py(+55 lines)docs/01_ADR/ADR-393-airflow-per-phase-dag.md(new, 96 lines).claude/skills/pipeline-test/SKILL.md(+134 lines step 10a)🤖 Generated with Claude Code