Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions .claude/skills/pipeline-test/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ docker exec maple-airflow-scheduler airflow connections add cleanup \
docker exec maple-airflow-scheduler airflow dags unpause daily_collection_pipeline
docker exec maple-airflow-scheduler airflow dags unpause daily_cleanup_pipeline

# Verify #1292 per-phase branch is loaded (only after branch_on_scope merged).
# Without this guard the section silently no-ops and operators think the
# per-phase wiring is broken. Fail-fast on missing branch_on_scope task.
if docker exec maple-airflow-scheduler airflow tasks list daily_collection_pipeline 2>/dev/null | grep -q '^branch_on_scope$'; then
echo "per-phase branch loaded: branch_on_scope present"
else
echo "WARNING: branch_on_scope not in daily_collection_pipeline — issue #1292 branch not deployed"
fi

# Trigger DAG manually
docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline

Expand Down Expand Up @@ -494,6 +503,127 @@ done

**사용자가 명시적으로 종료를 요청할 때만 실행.** 파이프라인은 장시간 실행(~2시간)이므로 자동 종료하지 않음. 사용자가 "종료", "stop", "cleanup" 등을 말하면 실행.

### 10a. Per-phase scope verification (issue #1292)

Smoke verification that the per-phase Airflow branch (added in #1292) drives the ext-api per-phase endpoints end-to-end. Runs **after** the main daily E2E completes successfully. Skipped if `branch_on_scope` task is not present (pre-#1292 deployments).

**Prereq check:**
```bash
# Detect whether the per-phase branch is deployed in this DAG file.
# Without this guard the section silently no-ops and operators think the
# per-phase wiring is broken. Fail-fast on missing branch_on_scope task.
if ! docker exec maple-airflow-scheduler airflow tasks list daily_collection_pipeline 2>/dev/null | grep -q '^branch_on_scope$'; then
echo "SKIP: branch_on_scope not in daily_collection_pipeline — issue #1292 branch not deployed"
return 0 2>/dev/null || exit 0
fi
echo "Per-phase branch loaded; running scope verification"
```

**10a.1 — Single-phase trigger:**
```bash
# Trigger only ITEM_EQUIPMENT (skips the rest of the daily chain)
docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \
-c '{"scope": ["ITEM_EQUIPMENT"]}'

# Wait up to 60s for ITEM_EQUIPMENT to become ACTIVE
for i in $(seq 1 30); do
phase=$(curl -s http://localhost:8081/api/internal/run-status | jq -r '.current.phase // ""')
if [ "${phase}" = "ITEM_EQUIPMENT" ]; then
echo "ITEM_EQUIPMENT ACTIVE after ${i}*2s"; break
fi
sleep 2
done
[ "${phase}" = "ITEM_EQUIPMENT" ] || { echo "ITEM_EQUIPMENT scope trigger failed"; exit 6; }
```

****10a.2 — Loop start:**
```bash
# Start a continuous loop for ITEM_EQUIPMENT.
# Note: OCID_LOOKUP_LOOP is rejected by ext-api (400 INVALID_PHASE) despite
# earlier #1291 spec mention — only CHARACTER_BASIC and ITEM_EQUIPMENT are
# accepted by PhaseLoopController.loopablePhases (verified 2026-06-18).
docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \
-c '{"scope": ["ITEM_EQUIPMENT_LOOP"]}'

# Wait up to 30s for loopId to appear in /run-status.loopSummaries
loop_id=""
for i in $(seq 1 15); do
loop_id=$(curl -s http://localhost:8081/api/internal/run-status \
| jq -r '.loopSummaries.ITEM_EQUIPMENT.loopId // ""')
if [ -n "${loop_id}" ] && [ "${loop_id}" != "null" ]; then
echo "Loop started: loopId=${loop_id}"; break
fi
sleep 2
done
[ -n "${loop_id}" ] && [ "${loop_id}" != "null" ] || { echo "Loop start failed"; exit 6; }
```

**10a.3 — Loop iteration progress:**
```bash
# Wait up to 90s for iterationCount > 0 (proves the loop is actually iterating)
iter=""
for i in $(seq 1 45); do
iter=$(curl -s http://localhost:8081/api/internal/run-status \
| jq -r '.loopSummaries.ITEM_EQUIPMENT.iterationCount // 0')
if [ "${iter}" -gt 0 ] 2>/dev/null; then
echo "Iteration count = ${iter} after ${i}*2s"; break
fi
sleep 2
done
[ "${iter}" -gt 0 ] 2>/dev/null || { echo "Loop did not iterate within 90s"; exit 6; }
```

**10a.4 — Loop stop:**
```bash
# Graceful stop via per-phase scope
docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \
-c '{"scope": ["ITEM_EQUIPMENT_STOP"]}'

# Wait up to 45s for status = STOPPED
status=""
for i in $(seq 1 23); do
status=$(curl -s http://localhost:8081/api/internal/run-status \
| jq -r '.loopSummaries.ITEM_EQUIPMENT.status // ""')
if [ "${status}" = "STOPPED" ]; then
echo "Loop STOPPED after ${i}*2s"; break
fi
sleep 2
done
[ "${status}" = "STOPPED" ] || { echo "Loop stop failed; status=${status}"; exit 6; }
```

**10a.5 — Invalid scope rejection:**
```bash
# RANKING_FETCH_LOOP must be rejected (not in #1291 loopablePhases)
trigger_output=$(docker exec maple-airflow-scheduler airflow dags trigger daily_collection_pipeline \
-c '{"scope": ["RANKING_FETCH_LOOP"]}' 2>&1)
# The trigger CLI itself accepts the run; the parse_scope failure surfaces
# as branch_on_scope → AirflowException. Verify via task instance state:
sleep 10
task_state=$(docker exec maple-airflow-scheduler airflow tasks state-for-ti \
daily_collection_pipeline "$(date -u +%Y-%m-%dT%H:%M:%S)" branch_on_scope 2>/dev/null \
| jq -r '.state // "unknown"')
# Expected: "failed" (AirflowException raised by parse_scope)
[ "${task_state}" = "failed" ] || { echo "Invalid scope not rejected; state=${task_state}"; exit 6; }
echo "Invalid scope correctly rejected"
```

**10a.6 — Cleanup loop artifacts:**
```bash
# Ensure no orphan loop survives past this section (defensive — step 10a.4
# should have stopped it, but verify before reporting PASS).
loop_state=$(curl -s http://localhost:8081/api/internal/run-status \
| jq -r '.loopSummaries.ITEM_EQUIPMENT.status // "NONE"')
case "${loop_state}" in
STOPPED|NONE|"") echo "Loop cleanly stopped";;
*) echo "WARNING: loop in unexpected state ${loop_state} after stop scope";;
esac
```

**Exit codes:**
- 0: all 6 sub-steps PASS
- 6: any sub-step FAILED (see error message above the exit)

```bash
# Stop Spring Boot services
if [ "${START_MODE:-nohup}" = "nohup" ]; then
Expand Down Expand Up @@ -527,6 +657,8 @@ docker compose -f docker-compose.yml -f docker-compose.airflow.yml stop airflow-
| Airflow can't reach services | Verify `host.docker.internal` in docker-compose.airflow.yml, check services are running on host |
| Airflow sensor false positive | Verify runId correlation — sensor checks `current.runId` matches trigger response |
| Airflow DB connection failed | Ensure maple-network exists: `docker network create maple-network` |
| Step 10a loop never iterates | Verify `loopSummaries.ITEM_EQUIPMENT.loopId` set; if null, `/loop/phase/ITEM_EQUIPMENT` returned non-202 — check ext-api logs for `PhaseLoopController` errors |
| Step 10a loop stop timeout | `/stop/phase/ITEM_EQUIPMENT` did not propagate `PhaseStoppedException`; check ext-api logs for `PhaseStopSignal.requestStop` and downstream chunk boundary halt |
| Cleanup returns 0 / "no runs found" while MinIO has old runs | `module-cleanup` defaulted to `LocalFsObjectStorage` (reads empty `../data/runs/`). Restart with `-Dstorage.backend=minio` JVM flag. StorageConfig's `@ConditionalOnProperty` has `matchIfMissing=true` so an unset property silently picks local. |

## Notes
Expand All @@ -539,6 +671,8 @@ docker compose -f docker-compose.yml -f docker-compose.airflow.yml stop airflow-
- Local profile DB: when `STORAGE_BACKEND=local`, the hardcoded `localhost:5432/maple_expectation` from `application-local.yml` is used. When `STORAGE_BACKEND=minio`, `.env` `DB_URL` is used directly (typically the dev cloud DB).
- `run-on-startup: true` in local profile starts pipeline immediately. When Airflow controls scheduling in production, set `run-on-startup: false` and `external-api.schedule.enabled: true` to keep the bean but disable auto-trigger.
- Airflow connects via `host.docker.internal` (Docker→host). This is transitional until services are containerized.
- Per-phase scope verification (step 10a) only runs when the #1292 branch (`branch_on_scope` task) is present in `daily_collection_pipeline`. Pre-#1292 deployments skip the section silently — see step 10a prereq check for the gate.
- Per-phase verification adds ~5min (steps 10a.1–10a.5) to the full pipeline test. Run after the main E2E (step 9) succeeds; isolated failures don't affect main E2E pass/fail.

## Storage backend

Expand Down
55 changes: 53 additions & 2 deletions docker/airflow/dags/daily_collection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,23 @@
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.python import PythonSensor

from per_phase_tasks import (
LOOP_PHASES,
STOP_PHASES,
TRIGGER_PHASES,
make_is_phase_terminal,
make_loop_task,
make_stop_task,
make_trigger_task,
parse_scope,
)


def is_health_up(response):
"""HttpSensor response_check: ext-api /actuator/health returns {"status": "UP"}.
Expand Down Expand Up @@ -235,4 +247,43 @@ def wait_for_item_equipment_cycle(**context):
wait_for_completion=False,
)

check_external_api >> trigger_daily_collection >> wait_for_completion >> wait_ie_cycle >> trigger_cleanup
# Per-phase branch (issue #1292). Activated when dag_run.conf['scope']
# is set to a list of action values (see per_phase_tasks.ALLOWED_SCOPES).
branch_on_scope = BranchPythonOperator(
task_id="branch_on_scope",
python_callable=lambda **ctx: (
"trigger_daily_collection"
if parse_scope(ctx["dag_run"].conf or {}) == ["FULL_DAILY"]
else "per_phase_join"
),
)

per_phase_join = EmptyOperator(
task_id="per_phase_join",
trigger_rule="none_failed_min_one_success",
)

per_phase_trigger_tasks = [make_trigger_task(p) for p in TRIGGER_PHASES]
per_phase_loop_tasks = [make_loop_task(p) for p in LOOP_PHASES]
per_phase_stop_tasks = [make_stop_task(p) for p in STOP_PHASES]

per_phase_trigger_sensors = [
PythonSensor(
task_id=f"per_phase_wait_{p.lower()}_completion",
python_callable=make_is_phase_terminal(p),
mode="reschedule",
poke_interval=60,
timeout=60 * 60 * 4,
)
for p in TRIGGER_PHASES
]

check_external_api >> branch_on_scope
branch_on_scope >> trigger_daily_collection
branch_on_scope >> per_phase_join
per_phase_join >> per_phase_trigger_tasks
per_phase_join >> per_phase_loop_tasks
per_phase_join >> per_phase_stop_tasks
for trig, sens in zip(per_phase_trigger_tasks, per_phase_trigger_sensors):
trig >> sens
trigger_daily_collection >> wait_for_completion >> wait_ie_cycle >> trigger_cleanup
Loading