Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions apps/backend/tests/test_llm_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@ def _llm(payload: Any, *, raise_exc: Exception | None = None) -> _StubLLM:
return _StubLLM(_StubStructured(payload, raise_exc=raise_exc))


class _SequenceStructured:
"""Replay a sequence of (payload, raise_exc) pairs across ``ainvoke``.

Lets us test the 1-retry policy: first call can raise/return junk and
the second call returns a valid RouterDecision.
"""

def __init__(self, steps: list[tuple[Any, Exception | None]]):
self._steps = list(steps)
self.calls = 0

async def ainvoke(self, _messages: list[Any]) -> Any:
index = min(self.calls, len(self._steps) - 1)
self.calls += 1
payload, exc = self._steps[index]
if exc is not None:
raise exc
return payload


def _sequence_llm(steps: list[tuple[Any, Exception | None]]) -> _StubLLM:
return _StubLLM(_SequenceStructured(steps)) # type: ignore[arg-type]


@pytest.mark.asyncio
async def test_valid_decision_passes_through() -> None:
decision_payload = RouterDecision(
Expand Down Expand Up @@ -209,3 +233,77 @@ async def test_safeguard_forced_finish_strips_direct_answer_content() -> None:
assert decision.next == "FINISH"
assert status == "rejected_invalid_goto"
assert decision.content == ""


@pytest.mark.asyncio
async def test_parse_failure_retries_once_and_recovers() -> None:
"""Plan §4.0.5: structured-output 파싱 실패 시 1회 재요청 후 성공해야 한다."""
recovered = RouterDecision(
next="data_science_team",
reason="retry recovered",
request_review=False,
content="",
)
llm = _sequence_llm(
[
(None, ValueError("Structured Output response does not have a parsed field")),
(recovered, None),
]
)
decision, status = await decide_route(
llm, # type: ignore[arg-type]
system_prompt="sys",
messages=[],
allowed_nodes=["data_science_team"],
layer="head",
)
assert decision.next == "data_science_team"
assert decision.reason == "retry recovered"
assert status == "accepted"
# Ensure the underlying stub was indeed called twice (1 fail + 1 retry).
assert llm._structured.calls == 2 # type: ignore[attr-defined]


@pytest.mark.asyncio
async def test_parse_failure_persists_across_both_attempts() -> None:
"""두 번 모두 파싱이 실패하면 safeguard FINISH로 폴백."""
llm = _sequence_llm(
[
(None, ValueError("first parse fail")),
(None, ValueError("second parse fail")),
]
)
decision, status = await decide_route(
llm, # type: ignore[arg-type]
system_prompt="sys",
messages=[],
allowed_nodes=["data_science_team"],
layer="head",
)
assert decision.next == "FINISH"
assert status == "parse_failed"
assert llm._structured.calls == 2 # type: ignore[attr-defined]


@pytest.mark.asyncio
async def test_salvage_router_decision_from_raw_error_message() -> None:
"""OpenAI Responses API가 raw text content로 emit해도 error 메시지에서 JSON을 추출해 복원."""
raw_msg = (
"Structured Output response does not have a 'parsed' field nor a 'refusal' "
"field. Received message: content=[{'type': 'text', 'text': "
'\'{"next":"data_science_team","reason":"sales.csv product 분석"}\'}'
"]"
)
err = ValueError(raw_msg)
llm = _sequence_llm([(None, err), (None, err)]) # 둘 다 같은 에러
decision, status = await decide_route(
llm, # type: ignore[arg-type]
system_prompt="sys",
messages=[],
allowed_nodes=["data_science_team"],
layer="head",
)
# Tier-3 recovery — JSON salvaged from the raw error string.
assert decision.next == "data_science_team"
assert "sales.csv" in decision.reason
assert status == "accepted"
38 changes: 32 additions & 6 deletions packages/agent-core/src/agent_core/supervisors/head_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,18 @@ async def head_supervisor_node(state: BaseAgentState) -> Command:
if next_node not in {"FINISH", final_node_name}
else None
)
# Turn boundary: both raw FINISH and finalizer routes end the current
# turn from the user's perspective. We label both as "completed" so
# ``route_history`` slicing in follow-up turns can isolate this-turn
# entries via the most recent ``status="completed"`` head checkpoint.
# ``streaming_status`` keeps the legacy "running" semantics so the SSE
# consumer still sees the finalizer phase as in-flight.
is_turn_end = next_node == "FINISH"
status_label: Literal["running", "completed"] = (
"completed"
if next_node == "FINISH" and not should_use_finalizer
else "running"
"completed" if is_turn_end and not should_use_finalizer else "running"
)
route_status_label: Literal["running", "completed"] = (
"completed" if is_turn_end else "running"
)
route_next_node = final_node_name if should_use_finalizer else next_node

Expand All @@ -233,7 +241,7 @@ async def head_supervisor_node(state: BaseAgentState) -> Command:
node="head_supervisor",
next_node=route_next_node or next_node,
team=next_team,
status=status_label,
status=route_status_label,
reasoning=decision.reason,
)
],
Expand All @@ -257,9 +265,27 @@ async def head_supervisor_node(state: BaseAgentState) -> Command:


def _max_same_team_streak(route_history: Iterable[dict[str, Any]]) -> int:
"""Largest streak of consecutive head-layer redirects to the same team."""
"""Largest streak of head-layer redirects to the same team in THIS turn.

Multi-turn threads accumulate ``route_history`` across turns. Counting
every prior turn's head→team entry would push the streak past the
safeguard limit on the very first head call of a new turn, even when
nothing has actually looped this turn. We slice to entries after the
most recent head ``status="completed"`` checkpoint (which marks the end
of the previous turn) so the streak reflects only the current turn.
"""
history_list = list(route_history)
last_completed_idx = -1
for idx, entry in enumerate(history_list):
if entry.get("layer") == "head" and entry.get("status") == "completed":
last_completed_idx = idx
current_turn = (
history_list[last_completed_idx + 1 :]
if last_completed_idx >= 0
else history_list
)
streaks: dict[str, int] = {}
for entry in route_history:
for entry in current_turn:
if entry.get("layer") != "head":
continue
team = entry.get("team")
Expand Down
79 changes: 65 additions & 14 deletions packages/agent-core/src/agent_core/supervisors/llm_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,40 @@ async def decide_route(
request = [{"role": "system", "content": system_prompt}, *messages]
structured_llm = llm.with_structured_output(RouterDecision)

raw_text = ""
try:
response: Any = await structured_llm.ainvoke(request)
except (ValidationError, ValueError, TypeError) as exc:
# Pydantic + LangChain parse failures surface as ValidationError /
# ValueError depending on the provider. Fall back to FINISH instead
# of crashing the whole turn (plan §4.0 P3).
raw_text = repr(exc)
return fallback_decision_on_parse_failure(raw_text=raw_text), "parse_failed"
# plan §4.0.5: "LLM structured output 파싱 실패 → 1회 재요청 → 그래도
# 실패면 FINISH". OpenAI Responses API + langchain structured output
# 조합에서 가끔 ``parsed`` 필드 없이 raw text content를 emit하는데,
# (1) 한 번 재시도하고, (2) 그래도 실패하면 raw error 메시지 안에 박힌
# JSON을 직접 추출해 RouterDecision으로 복원한다.
response: Any = None
last_error: Any = None
decision: RouterDecision | None = None
for attempt in range(2):
try:
response = await structured_llm.ainvoke(request)
except (ValidationError, ValueError, TypeError) as exc:
last_error = exc
response = None
continue
decision_attempt = _coerce_to_router_decision(response)
if decision_attempt is not None:
decision = decision_attempt
break

decision = _coerce_to_router_decision(response)
if decision is None:
return (
fallback_decision_on_parse_failure(raw_text=str(response)),
"parse_failed",
)
# Tier-3 recovery: try to extract the JSON RouterDecision that OpenAI
# emitted as a raw text content block. The text is reflected back in
# the langchain ValueError message, so we can salvage it without
# making a third LLM round-trip. This keeps multi-turn follow-up
# requests usable when the provider intermittently bypasses its own
# ``parsed`` field.
salvaged = _salvage_router_decision_from_error(last_error)
if salvaged is not None:
decision = salvaged

if decision is None:
raw_text = repr(last_error) if last_error is not None else str(response)
return fallback_decision_on_parse_failure(raw_text=raw_text), "parse_failed"

# Safeguard 1 — invalid `next` value gets forced to FINISH.
outcome = reject_invalid_goto(decision, allowed_nodes)
Expand Down Expand Up @@ -127,6 +145,39 @@ async def decide_route(
return outcome.decision, "accepted"


def _salvage_router_decision_from_error(error: Any) -> RouterDecision | None:
"""Best-effort recovery when OpenAI Responses API skips the ``parsed`` field.

The provider sometimes emits a valid JSON ``RouterDecision`` blob inside
a raw text content block instead of populating the structured ``parsed``
field, and langchain raises ``ValueError`` that includes the original
message. We scan that message for the first balanced JSON object and try
to validate it against ``RouterDecision``. Returning ``None`` here means
the caller should fall back to the safeguard FINISH.
"""
if error is None:
return None
import json
import re

message = repr(error)
# Greedy match the first JSON object that mentions a ``next`` key.
# Balanced-braces regex is hard in Python's ``re``; we instead find every
# ``{...}`` chunk and try them in order until one parses cleanly.
for candidate in re.findall(r"\{[^{}]*\}", message):
try:
payload = json.loads(candidate)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(payload, dict) or "next" not in payload:
continue
try:
return RouterDecision.model_validate(payload)
except ValidationError:
continue
return None


def _coerce_to_router_decision(raw: Any) -> RouterDecision | None:
"""Best-effort coercion of LangChain's structured-output return value.

Expand Down
Loading
Loading