From ef5b4dda793ae65f8b6587f9d1c317941a131c0e Mon Sep 17 00:00:00 2001 From: DONGRYEOLLEE1 Date: Fri, 22 May 2026 13:36:48 +0900 Subject: [PATCH] fix(supervisor): multi-turn follow-up turn handoff + savefig nesting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit S-G 시나리오(같은 thread 2번째 메시지에서 동일 csv 재사용 차트 추가) 실패 원인 4건을 한 묶음으로 fix. 1) LLMRouter parse-failure 1회 retry + raw error JSON salvage (plan §4.0.5) - decide_route가 첫 호출 ValueError 시 즉시 fallback FINISH로 종료하던 문제. 1회 재시도 + 그래도 실패 시 ValidationError 메시지에 박힌 RouterDecision JSON 추출하여 복원. test_llm_router.py +2 cases. 2) head supervisor _max_same_team_streak를 current-turn-only로 필터 - 멀티 turn에서 누적된 head→team 카운트로 인해 turn 2 첫 호출부터 redirect 5/2 safeguard 발동. route_history를 가장 최근 head status="completed" 이후로 슬라이스. 3) team supervisor dispatch count도 current-turn 재계산 - shared_context의 _dispatch_count는 turn 누적 → follow-up turn에서 첫 worker 호출도 6/5 ceiling hit. route_history 슬라이스로 재계산. 4) team worker history note를 prev/current turn 분리 + finalizer 경유 head도 turn 종료로 status="completed" 마킹 - "이전 turn은 이미 완료" + "이번 turn은 아직 미시작" 컨텍스트를 team supervisor LLM에 명확히 전달. finalizer 경로도 route entry status="completed"로 boundary 신호. 5) matplotlib savefig monkey-patch nesting 차단 - matplotlib.pyplot은 process-wide singleton이라 매 turn 새 wrapper가 이전 wrapper 위에 nesting되어 stale artifact_dir로 path rewriting. _plt._orchagent_real_savefig에 진짜 savefig를 1회 보존하고 매 turn 그 reference로 fresh wrap. E2E (dong, playwright) S-G: - turn 1: region별_revenue_합계_막대차트.png ✅ - turn 2: product별_units_sold_합계_막대차트.png ✅ (같은 csv 재사용) pytest 316 → 319 PASS (+3 retry/salvage tests), 회귀 0. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/backend/tests/test_llm_router.py | 98 ++++++++++++ .../agent_core/supervisors/head_supervisor.py | 38 ++++- .../src/agent_core/supervisors/llm_router.py | 79 +++++++-- .../agent_core/supervisors/team_supervisor.py | 151 +++++++++++++----- packages/agent-tools/src/agent_tools/data.py | 13 +- 5 files changed, 321 insertions(+), 58 deletions(-) diff --git a/apps/backend/tests/test_llm_router.py b/apps/backend/tests/test_llm_router.py index 9d8fc98..f489ba4 100644 --- a/apps/backend/tests/test_llm_router.py +++ b/apps/backend/tests/test_llm_router.py @@ -51,6 +51,30 @@ def _llm(payload: Any, *, raise_exc: Exception | None = None) -> _StubLLM: return _StubLLM(_StubStructured(payload, raise_exc=raise_exc)) +class _SequenceStructured: + """Replay a sequence of (payload, raise_exc) pairs across ``ainvoke``. + + Lets us test the 1-retry policy: first call can raise/return junk and + the second call returns a valid RouterDecision. + """ + + def __init__(self, steps: list[tuple[Any, Exception | None]]): + self._steps = list(steps) + self.calls = 0 + + async def ainvoke(self, _messages: list[Any]) -> Any: + index = min(self.calls, len(self._steps) - 1) + self.calls += 1 + payload, exc = self._steps[index] + if exc is not None: + raise exc + return payload + + +def _sequence_llm(steps: list[tuple[Any, Exception | None]]) -> _StubLLM: + return _StubLLM(_SequenceStructured(steps)) # type: ignore[arg-type] + + @pytest.mark.asyncio async def test_valid_decision_passes_through() -> None: decision_payload = RouterDecision( @@ -209,3 +233,77 @@ async def test_safeguard_forced_finish_strips_direct_answer_content() -> None: assert decision.next == "FINISH" assert status == "rejected_invalid_goto" assert decision.content == "" + + +@pytest.mark.asyncio +async def test_parse_failure_retries_once_and_recovers() -> None: + """Plan §4.0.5: structured-output 파싱 실패 시 1회 재요청 후 성공해야 한다.""" + recovered = RouterDecision( + next="data_science_team", + reason="retry recovered", + request_review=False, + content="", + ) + llm = _sequence_llm( + [ + (None, ValueError("Structured Output response does not have a parsed field")), + (recovered, None), + ] + ) + decision, status = await decide_route( + llm, # type: ignore[arg-type] + system_prompt="sys", + messages=[], + allowed_nodes=["data_science_team"], + layer="head", + ) + assert decision.next == "data_science_team" + assert decision.reason == "retry recovered" + assert status == "accepted" + # Ensure the underlying stub was indeed called twice (1 fail + 1 retry). + assert llm._structured.calls == 2 # type: ignore[attr-defined] + + +@pytest.mark.asyncio +async def test_parse_failure_persists_across_both_attempts() -> None: + """두 번 모두 파싱이 실패하면 safeguard FINISH로 폴백.""" + llm = _sequence_llm( + [ + (None, ValueError("first parse fail")), + (None, ValueError("second parse fail")), + ] + ) + decision, status = await decide_route( + llm, # type: ignore[arg-type] + system_prompt="sys", + messages=[], + allowed_nodes=["data_science_team"], + layer="head", + ) + assert decision.next == "FINISH" + assert status == "parse_failed" + assert llm._structured.calls == 2 # type: ignore[attr-defined] + + +@pytest.mark.asyncio +async def test_salvage_router_decision_from_raw_error_message() -> None: + """OpenAI Responses API가 raw text content로 emit해도 error 메시지에서 JSON을 추출해 복원.""" + raw_msg = ( + "Structured Output response does not have a 'parsed' field nor a 'refusal' " + "field. Received message: content=[{'type': 'text', 'text': " + '\'{"next":"data_science_team","reason":"sales.csv product 분석"}\'}' + "]" + ) + err = ValueError(raw_msg) + llm = _sequence_llm([(None, err), (None, err)]) # 둘 다 같은 에러 + decision, status = await decide_route( + llm, # type: ignore[arg-type] + system_prompt="sys", + messages=[], + allowed_nodes=["data_science_team"], + layer="head", + ) + # Tier-3 recovery — JSON salvaged from the raw error string. + assert decision.next == "data_science_team" + assert "sales.csv" in decision.reason + assert status == "accepted" diff --git a/packages/agent-core/src/agent_core/supervisors/head_supervisor.py b/packages/agent-core/src/agent_core/supervisors/head_supervisor.py index 3600320..da3cf9e 100644 --- a/packages/agent-core/src/agent_core/supervisors/head_supervisor.py +++ b/packages/agent-core/src/agent_core/supervisors/head_supervisor.py @@ -214,10 +214,18 @@ async def head_supervisor_node(state: BaseAgentState) -> Command: if next_node not in {"FINISH", final_node_name} else None ) + # Turn boundary: both raw FINISH and finalizer routes end the current + # turn from the user's perspective. We label both as "completed" so + # ``route_history`` slicing in follow-up turns can isolate this-turn + # entries via the most recent ``status="completed"`` head checkpoint. + # ``streaming_status`` keeps the legacy "running" semantics so the SSE + # consumer still sees the finalizer phase as in-flight. + is_turn_end = next_node == "FINISH" status_label: Literal["running", "completed"] = ( - "completed" - if next_node == "FINISH" and not should_use_finalizer - else "running" + "completed" if is_turn_end and not should_use_finalizer else "running" + ) + route_status_label: Literal["running", "completed"] = ( + "completed" if is_turn_end else "running" ) route_next_node = final_node_name if should_use_finalizer else next_node @@ -233,7 +241,7 @@ async def head_supervisor_node(state: BaseAgentState) -> Command: node="head_supervisor", next_node=route_next_node or next_node, team=next_team, - status=status_label, + status=route_status_label, reasoning=decision.reason, ) ], @@ -257,9 +265,27 @@ async def head_supervisor_node(state: BaseAgentState) -> Command: def _max_same_team_streak(route_history: Iterable[dict[str, Any]]) -> int: - """Largest streak of consecutive head-layer redirects to the same team.""" + """Largest streak of head-layer redirects to the same team in THIS turn. + + Multi-turn threads accumulate ``route_history`` across turns. Counting + every prior turn's head→team entry would push the streak past the + safeguard limit on the very first head call of a new turn, even when + nothing has actually looped this turn. We slice to entries after the + most recent head ``status="completed"`` checkpoint (which marks the end + of the previous turn) so the streak reflects only the current turn. + """ + history_list = list(route_history) + last_completed_idx = -1 + for idx, entry in enumerate(history_list): + if entry.get("layer") == "head" and entry.get("status") == "completed": + last_completed_idx = idx + current_turn = ( + history_list[last_completed_idx + 1 :] + if last_completed_idx >= 0 + else history_list + ) streaks: dict[str, int] = {} - for entry in route_history: + for entry in current_turn: if entry.get("layer") != "head": continue team = entry.get("team") diff --git a/packages/agent-core/src/agent_core/supervisors/llm_router.py b/packages/agent-core/src/agent_core/supervisors/llm_router.py index ed32218..40a9e95 100644 --- a/packages/agent-core/src/agent_core/supervisors/llm_router.py +++ b/packages/agent-core/src/agent_core/supervisors/llm_router.py @@ -82,22 +82,40 @@ async def decide_route( request = [{"role": "system", "content": system_prompt}, *messages] structured_llm = llm.with_structured_output(RouterDecision) - raw_text = "" - try: - response: Any = await structured_llm.ainvoke(request) - except (ValidationError, ValueError, TypeError) as exc: - # Pydantic + LangChain parse failures surface as ValidationError / - # ValueError depending on the provider. Fall back to FINISH instead - # of crashing the whole turn (plan §4.0 P3). - raw_text = repr(exc) - return fallback_decision_on_parse_failure(raw_text=raw_text), "parse_failed" + # plan §4.0.5: "LLM structured output 파싱 실패 → 1회 재요청 → 그래도 + # 실패면 FINISH". OpenAI Responses API + langchain structured output + # 조합에서 가끔 ``parsed`` 필드 없이 raw text content를 emit하는데, + # (1) 한 번 재시도하고, (2) 그래도 실패하면 raw error 메시지 안에 박힌 + # JSON을 직접 추출해 RouterDecision으로 복원한다. + response: Any = None + last_error: Any = None + decision: RouterDecision | None = None + for attempt in range(2): + try: + response = await structured_llm.ainvoke(request) + except (ValidationError, ValueError, TypeError) as exc: + last_error = exc + response = None + continue + decision_attempt = _coerce_to_router_decision(response) + if decision_attempt is not None: + decision = decision_attempt + break - decision = _coerce_to_router_decision(response) if decision is None: - return ( - fallback_decision_on_parse_failure(raw_text=str(response)), - "parse_failed", - ) + # Tier-3 recovery: try to extract the JSON RouterDecision that OpenAI + # emitted as a raw text content block. The text is reflected back in + # the langchain ValueError message, so we can salvage it without + # making a third LLM round-trip. This keeps multi-turn follow-up + # requests usable when the provider intermittently bypasses its own + # ``parsed`` field. + salvaged = _salvage_router_decision_from_error(last_error) + if salvaged is not None: + decision = salvaged + + if decision is None: + raw_text = repr(last_error) if last_error is not None else str(response) + return fallback_decision_on_parse_failure(raw_text=raw_text), "parse_failed" # Safeguard 1 — invalid `next` value gets forced to FINISH. outcome = reject_invalid_goto(decision, allowed_nodes) @@ -127,6 +145,39 @@ async def decide_route( return outcome.decision, "accepted" +def _salvage_router_decision_from_error(error: Any) -> RouterDecision | None: + """Best-effort recovery when OpenAI Responses API skips the ``parsed`` field. + + The provider sometimes emits a valid JSON ``RouterDecision`` blob inside + a raw text content block instead of populating the structured ``parsed`` + field, and langchain raises ``ValueError`` that includes the original + message. We scan that message for the first balanced JSON object and try + to validate it against ``RouterDecision``. Returning ``None`` here means + the caller should fall back to the safeguard FINISH. + """ + if error is None: + return None + import json + import re + + message = repr(error) + # Greedy match the first JSON object that mentions a ``next`` key. + # Balanced-braces regex is hard in Python's ``re``; we instead find every + # ``{...}`` chunk and try them in order until one parses cleanly. + for candidate in re.findall(r"\{[^{}]*\}", message): + try: + payload = json.loads(candidate) + except (json.JSONDecodeError, ValueError): + continue + if not isinstance(payload, dict) or "next" not in payload: + continue + try: + return RouterDecision.model_validate(payload) + except ValidationError: + continue + return None + + def _coerce_to_router_decision(raw: Any) -> RouterDecision | None: """Best-effort coercion of LangChain's structured-output return value. diff --git a/packages/agent-core/src/agent_core/supervisors/team_supervisor.py b/packages/agent-core/src/agent_core/supervisors/team_supervisor.py index d527fe5..6347d91 100644 --- a/packages/agent-core/src/agent_core/supervisors/team_supervisor.py +++ b/packages/agent-core/src/agent_core/supervisors/team_supervisor.py @@ -61,10 +61,15 @@ async def team_supervisor_node(state: BaseAgentState) -> Command: team_dispatch_count_key = ( f"{normalized_team}_dispatch_count" if normalized_team else None ) - team_dispatch_count = ( - int(shared_context.get(team_dispatch_count_key, 0)) - if team_dispatch_count_key - else 0 + + # Count THIS-TURN dispatches only. The legacy counter in + # ``shared_context`` accumulates across turns, so on a follow-up + # turn the team would hit the ceiling on its very first worker + # dispatch. We recompute from ``route_history`` sliced at the most + # recent head ``status="completed"`` checkpoint instead. + route_history = state.get("route_history") or [] + team_dispatch_count = _current_turn_team_dispatches( + route_history, normalized_team=normalized_team ) # Pre-check dispatch ceiling before paying for an LLM call — saves a @@ -90,10 +95,9 @@ async def team_supervisor_node(state: BaseAgentState) -> Command: shared_context=shared_context, ) - # Surface this-turn worker history to the LLM as a system note so it - # never has to recompute it from the raw conversation. The LLM still - # makes the routing decision — this is data, not a rule (plan §4.0 P1). - route_history = state.get("route_history") or [] + # Surface worker history to the LLM as a system note so it never has + # to recompute it from the raw conversation. The LLM still makes the + # routing decision — this is data, not a rule (plan §4.0 P1). worker_history_note = _format_worker_history_note( route_history, normalized_team=normalized_team ) @@ -182,48 +186,123 @@ def _log_decision(decision: Any, goto: str, status: str) -> None: print(f"[TeamSupervisor] Safeguard status: {status}", flush=True) -def _format_worker_history_note( +def _current_turn_team_dispatches( route_history: list[dict[str, Any]], *, normalized_team: str | None, -) -> str | None: - """Summarize this team's worker history so the LLM can see prior dispatches. - - Returns ``None`` when there is nothing meaningful to report. The output - is intentionally compact so the LLM can integrate it without being - distracted from its system prompt rules. +) -> int: + """Count this team's worker dispatches IN THIS TURN ONLY. + + The legacy ``_dispatch_count`` field in ``shared_context`` + accumulates across turns, which would trip the dispatch ceiling on a + follow-up turn's very first worker call. We recompute from the + ``route_history`` slice that comes after the most recent head + ``status="completed"`` so the count is turn-local. """ if not normalized_team or not route_history: - return None - - workers_called: list[str] = [] - for entry in route_history: + return 0 + last_completed_idx = -1 + for idx, entry in enumerate(route_history): + if entry.get("layer") == "head" and entry.get("status") == "completed": + last_completed_idx = idx + current_turn = ( + route_history[last_completed_idx + 1 :] + if last_completed_idx >= 0 + else route_history + ) + count = 0 + for entry in current_turn: if entry.get("layer") != "team": continue if entry.get("team") != normalized_team: continue worker = entry.get("worker") if isinstance(worker, str) and worker and worker != "FINISH": - workers_called.append(worker) + count += 1 + return count - if not workers_called: + +def _format_worker_history_note( + route_history: list[dict[str, Any]], + *, + normalized_team: str | None, +) -> str | None: + """Summarize this team's worker history split by turn boundary. + + Multi-turn threads accumulate ``route_history`` across turns. The team + supervisor's worker-selection LLM needs both pieces of context: + + 1. Prior-turn workers — so it knows the team already ran (and which + brief/analysis is already on the conversation), useful for + follow-up requests like "same data, different chart". + 2. This-turn workers — so it never repeats a worker that already ran + in the current turn. + + Returns ``None`` when there is nothing meaningful to report. + """ + if not normalized_team or not route_history: return None - counts: dict[str, int] = {} - for worker in workers_called: - counts[worker] = counts.get(worker, 0) + 1 - summary = ", ".join( - f"{worker} ({count} call{'s' if count > 1 else ''})" - for worker, count in counts.items() - ) - return ( - "# THIS-TURN WORKER HISTORY\n" - f"- Already dispatched in this turn: {summary}.\n" - "- A worker that already ran cannot be dispatched again unless the\n" - " Reviewer feedback names a concrete code-level gap only that\n" - " worker can fix. If the brief or analysis is already in the\n" - " conversation, route to the NEXT worker in the workflow." - ) + # Slice the history at the most recent head ``status="completed"``; + # entries before that index belong to previous turns. + last_completed_idx = -1 + for idx, entry in enumerate(route_history): + if entry.get("layer") == "head" and entry.get("status") == "completed": + last_completed_idx = idx + + if last_completed_idx >= 0: + previous_turn = route_history[: last_completed_idx + 1] + current_turn = route_history[last_completed_idx + 1 :] + else: + previous_turn = [] + current_turn = list(route_history) + + def _team_worker_counts(entries: list[dict[str, Any]]) -> dict[str, int]: + counts: dict[str, int] = {} + for entry in entries: + if entry.get("layer") != "team": + continue + if entry.get("team") != normalized_team: + continue + worker = entry.get("worker") + if isinstance(worker, str) and worker and worker != "FINISH": + counts[worker] = counts.get(worker, 0) + 1 + return counts + + prev_counts = _team_worker_counts(previous_turn) + curr_counts = _team_worker_counts(current_turn) + if not prev_counts and not curr_counts: + return None + + def _summary(counts: dict[str, int]) -> str: + return ", ".join( + f"{worker} ({count} call{'s' if count > 1 else ''})" + for worker, count in counts.items() + ) + + lines = ["# TEAM WORKER HISTORY"] + if prev_counts: + lines.append( + f"- Previous turns (already produced briefs/analyses you can " + f"build on): {_summary(prev_counts)}." + ) + if curr_counts: + lines.append( + f"- This turn so far: {_summary(curr_counts)}." + ) + lines.append( + "- A worker that already ran in THIS TURN cannot be dispatched" + " again unless the Reviewer feedback names a concrete code-level" + " gap only that worker can fix. Route to the NEXT worker in the" + " workflow." + ) + else: + lines.append( + "- Nothing dispatched yet in this turn — start with the worker" + " that owns the new request (the engineer briefs/inspects, the" + " analyst computes and renders charts)." + ) + return "\n".join(lines) __all__ = ["make_team_supervisor_node"] diff --git a/packages/agent-tools/src/agent_tools/data.py b/packages/agent-tools/src/agent_tools/data.py index e61471c..e11b7a1 100644 --- a/packages/agent-tools/src/agent_tools/data.py +++ b/packages/agent-tools/src/agent_tools/data.py @@ -395,8 +395,17 @@ def _disabled_network(*args, **kwargs): socket.create_connection = _disabled_network _artifact_dir = r"{context.artifact_dir}" -_original_pyplot_savefig = _plt.savefig -_original_figure_savefig = _Figure.savefig +# Cross-turn safety: matplotlib.pyplot is a process-wide singleton, so each +# turn's monkey-patch would otherwise nest on top of the previous turn's +# wrapper and rewrite paths into a stale artifact_dir. We stash the REAL +# savefig the very first time this module touches it and rebind to that +# pristine reference on every subsequent turn. +if not hasattr(_plt, "_orchagent_real_savefig"): + _plt._orchagent_real_savefig = _plt.savefig +if not hasattr(_Figure, "_orchagent_real_savefig"): + _Figure._orchagent_real_savefig = _Figure.savefig +_original_pyplot_savefig = _plt._orchagent_real_savefig +_original_figure_savefig = _Figure._orchagent_real_savefig def _safe_pyplot_savefig(fname=None, *args, _original=_original_pyplot_savefig, _artifact_dir_value=_artifact_dir, **kwargs): rewritten = None if fname is None else os.path.join(_artifact_dir_value, os.path.basename(str(fname)))