From ada83843a73c99a3355e33eeec2d1e226c7c2745 Mon Sep 17 00:00:00 2001 From: Frank Speiser Date: Tue, 26 May 2026 10:04:16 -0400 Subject: [PATCH] Wire worker_tools through tool_coordinate (proposer + critics) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the follow-up logged in PR #21: coordinate now supports the bounded inner-ReAct loop, but only on the roles where evidence gathering is meaningful. Design choice: tool calls happen BEFORE the structured JSON emission. A `` tag in the middle of a role envelope would break schema validation, so `_request_structured_with_tools` interleaves the hop loop with schema parsing — tool calls each cost one hop; the response without a tool_call tag is parsed against the schema (with the existing retry-once-on-validation-failure semantics). Roles: - PROPOSER + CRITICS get worker_tools when enabled. They benefit from fetching evidence to ground their position. - SYNTHESIZER is intentionally EXCLUDED. Its job is purely combinatorial — combining proposer + critique output. Letting it call tools opens scope creep and external-call cost on what should be a pure reduction step. Implementation: - `_request_structured` grows kwargs `worker_tools` + `session_id`. When `worker_tools` is non-empty (after allowlist filtering) it delegates to `_request_structured_with_tools`. - `_request_structured_with_tools` interleaves the hop loop with schema parsing: * tool_call tag present + within budget -> dispatch, wrap, re-prompt * tool_call tag present + budget exhausted -> refusal + one final emission round; that final attempt is parsed once, no retry * no tool_call tag -> attempt JSON parse + schema validation; on failure, re-prompt once with the validation errors (the standard `_request_structured` retry path) - The system message hint enforces "tool_call envelopes only BEFORE the final JSON object" so the worker doesn't try to mix them. - `tool_coordinate` accepts `worker_tools: [...]` arg, filters against the hard allowlist, and surfaces `worker_tools: {accepted, rejected, hop_budget, applies_to: ["proposer", "critic"]}` back on the response — operators see explicitly that synth is excluded. - Tool calls roll up under the same session_id (cost, breakers, fetch egress budget all apply to inner calls). Schema: - `coordinate.input.worker_tools` added with `enum: ["fetch","verify"]` and a note that synth is excluded. Tests (scripts/test_coordinate_worker_tools.py): - `_request_structured_with_tools` happy path (one tool_call -> valid envelope) - Hop budget exhaustion inside the structured loop -> 3rd request refused, worker still produces a valid envelope on the final round - worker_tools empty -> identity behavior (no tool-call hint injected; no `inner_tool_calls` on the answer) - Schema validation retry preserved on the tool-use path - End-to-end coordinate: proposer + both critics each fetch once; synth dispatch does NOT carry the tool-call hint in its system prompt; inner_tool_calls recorded on per-role answers - Legacy coordinate (no worker_tools arg): no metadata on the response, no extra prompt scaffolding, no fetches Full suite (35 scripts) passes. Co-Authored-By: Claude Opus 4.7 --- schema/tools.schema.json | 4 +- scripts/test_coordinate_worker_tools.py | 318 ++++++++++++++++++++++++ servers/python/crosscheck_server.py | 190 +++++++++++++- 3 files changed, 509 insertions(+), 3 deletions(-) create mode 100644 scripts/test_coordinate_worker_tools.py diff --git a/schema/tools.schema.json b/schema/tools.schema.json index 806fcc6..cb4f151 100644 --- a/schema/tools.schema.json +++ b/schema/tools.schema.json @@ -398,7 +398,9 @@ "session_id": { "type": "string", "description": "Optional id; structured claims are persisted under this session." }, "untrusted_input": { "type": "boolean", "default": false }, "inject_session_memory": { "type": "boolean", "default": false, - "description": "When true (and a session_id is set), prepend the session's non-stale `` block to the topic before dispatch. Stale entries (e.g. from a prior failed audit) are excluded." } + "description": "When true (and a session_id is set), prepend the session's non-stale `` block to the topic before dispatch. Stale entries (e.g. from a prior failed audit) are excluded." }, + "worker_tools": { "type": "array", "items": { "type": "string", "enum": ["fetch", "verify"] }, + "description": "Opt-in: enable bounded mid-turn tool use on the PROPOSER and CRITIC roles (the SYNTHESIZER is purely combinatorial and is intentionally excluded). Workers may emit `{\"name\":\"TOOL\",\"args\":{...}}` BEFORE the final structured emission; tool results are wrapped as untrusted-input and re-prompted. Hard hop budget = 2 per role. Only `fetch` and `verify` are callable — recursive ReAct via LLM-spawning tools is disallowed." } }, "required": ["topic"] }, diff --git a/scripts/test_coordinate_worker_tools.py b/scripts/test_coordinate_worker_tools.py new file mode 100644 index 0000000..5e52235 --- /dev/null +++ b/scripts/test_coordinate_worker_tools.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 +"""Tests for worker_tools wired into tool_coordinate (PR #21 follow-up). + +The challenge: coordinate's proposer/critic/synth roles emit +JSON-schema-validated envelopes via `_request_structured`. A +`` tag in the middle of an envelope would break schema +validation, so we run tool calls FIRST, then the structured emission. + +Covers: + - _request_structured_with_tools: proposer emits tool_call, then valid envelope + - Hop-budget exhaustion inside the structured loop still ends in a final + schema validation attempt + - When worker_tools is empty, _request_structured is the identity loop + (no tool hint injected, no extra system text bloating the prompt) + - End-to-end tool_coordinate: proposer fetches mid-turn, critic too, + synth is intentionally NOT given tools (purely combinatorial) + - tool_coordinate.worker_tools metadata: accepted/rejected/applies_to + surfaced on the response; synth is listed as excluded +""" + +from __future__ import annotations + +import json +import os +import sys +import tempfile +import time +from pathlib import Path + + +def main() -> int: + here = Path(__file__).resolve().parents[1] + sys.path.insert(0, str(here / "servers" / "python")) + + tmp = Path(tempfile.mkdtemp()) + pricing = tmp / "pricing.json" + pricing.write_text(json.dumps({ + "openai": {"gpt-test": {"prompt_per_1k": 0.0001, "completion_per_1k": 0.0003, "cached_per_1k": 0.00005}}, + "anthropic": {"claude-test": {"prompt_per_1k": 0.003, "completion_per_1k": 0.015, "cached_per_1k": 0.0003}}, + "xai": {"grok-test": {"prompt_per_1k": 0.005, "completion_per_1k": 0.015, "cached_per_1k": 0.0025}}, + })) + os.environ["CROSSCHECK_PRICING_PATH"] = str(pricing) + os.environ.pop("CROSSCHECK_REJECT_CONFIG_DRIFT", None) + + import crosscheck_server as srv + + srv.CFG = dict(srv.CFG) + srv.CFG["session_db"] = str(tmp / "sessions.db") + srv.CFG["transcript_dir"] = str(tmp / "transcripts") + srv.CFG["cache"] = {"enabled": False} + srv.CFG["node_cache"] = {"enabled": False} + srv.CFG["prompt_adapters"] = {"enabled": False} + srv.CFG["fetch"] = {"url_allowlist": ["https://example.com/"]} + srv.CFG["config_pinning"] = {"reject_drift": False} + srv.TRANSCRIPT_DIR = Path(srv.CFG["transcript_dir"]) + srv._DB_INIT_DONE = False + srv._FTS5_AVAILABLE = None + srv._PRICING_CACHE = None + srv.PRICING_PATH = pricing + srv._CONFIG_PIN_STARTUP_DONE = True # avoid noise during dispatch + + srv.ENV = dict(srv.ENV) + srv.ENV["OPENAI_API_KEY"] = "stub"; srv.ENV["OPENAI_MODEL"] = "gpt-test" + srv.ENV["ANTHROPIC_API_KEY"] = "stub"; srv.ENV["ANTHROPIC_MODEL"] = "claude-test" + srv.ENV["XAI_API_KEY"] = "stub"; srv.ENV["XAI_MODEL"] = "grok-test" + srv.ALL_PROVIDERS = srv.build_providers() + srv.CFG["providers"] = ["openai", "anthropic", "xai"] + srv.CFG["moderator"] = "anthropic" + + # ------------------------------------------------------------------ + # Test fixtures + # ------------------------------------------------------------------ + bodies: list[dict] = [] + openai_responses: list[str] = [] + anthropic_responses: list[str] = [] + xai_responses: list[str] = [] + fetched_urls: list[str] = [] + + def fake_post(url, h, b, **kw): + bodies.append({"url": url, "body": b}) + if "openai.com" in url: + text = openai_responses.pop(0) if openai_responses else "" + return ({"choices": [{"message": {"content": text}}], + "usage": {"prompt_tokens": 30, "completion_tokens": 10}}, 1) + if "anthropic.com" in url: + text = anthropic_responses.pop(0) if anthropic_responses else "" + return ({"content": [{"type": "text", "text": text}], + "usage": {"input_tokens": 30, "output_tokens": 10}}, 1) + if "x.ai" in url: + text = xai_responses.pop(0) if xai_responses else "" + return ({"choices": [{"message": {"content": text}}], + "usage": {"prompt_tokens": 30, "completion_tokens": 10}}, 1) + return ({}, 1) + srv._http_post_resilient = fake_post + + # Stub the real fetch tool so we can confirm inner dispatch happened. + real_tool_fetch = srv.tool_fetch + def fake_tool_fetch(args): + fetched_urls.append(args.get("url", "")) + return {"tool": "fetch", "url": args.get("url"), + "status": "ok", + "content_excerpt": "Sample content the worker can cite."} + srv.tool_fetch = fake_tool_fetch + + role_schema = srv._role_turn_schema() + valid_proposer = json.dumps({ + "role": "proposer", "summary": "Draft position", "confidence": 0.8, + "ballot": "agree", + }) + valid_critic = json.dumps({ + "role": "critic", "summary": "Looks reasonable", "confidence": 0.7, + "ballot": "agree", + }) + valid_synth = json.dumps({ + "consensus": "Use opaque tokens.", "weighted_confidence": 0.85, + "key_claims": [{"claim": "Simpler revocation", "confidence": 0.9}], + }) + + # ------------------------------------------------------------------ + # 1) _request_structured_with_tools — proposer emits tool_call, + # then a valid envelope + # ------------------------------------------------------------------ + bodies.clear(); fetched_urls.clear() + openai_responses[:] = [ + '{"name":"fetch","args":{"url":"https://example.com/spec"}}', + valid_proposer, + ] + obj, ans, errs = srv._request_structured( + srv.ALL_PROVIDERS["openai"], + [{"role": "system", "content": "PROPOSER role."}, + {"role": "user", "content": "Draft a position."}], + role_schema, + max_tokens=2048, deadline=time.monotonic() + 30, max_retries=1, + purpose="worker", + worker_tools=["fetch"], session_id="rst-happy", + ) + assert errs == [], errs + assert obj["role"] == "proposer", obj + assert fetched_urls == ["https://example.com/spec"], fetched_urls + assert ans.get("inner_tool_calls") == [{"hop": 1, "name": "fetch", "status": "ok"}], ans + + # ------------------------------------------------------------------ + # 2) Hop budget inside structured loop — 3 tool_calls then forced + # final emission; the refusal payload makes it into the worker's + # context but the final JSON still validates + # ------------------------------------------------------------------ + bodies.clear(); fetched_urls.clear() + openai_responses[:] = [ + '{"name":"fetch","args":{"url":"https://example.com/a"}}', + '{"name":"fetch","args":{"url":"https://example.com/b"}}', + '{"name":"fetch","args":{"url":"https://example.com/c"}}', + valid_proposer, + ] + obj, ans, errs = srv._request_structured( + srv.ALL_PROVIDERS["openai"], + [{"role": "system", "content": "PROPOSER."}, + {"role": "user", "content": "Draft."}], + role_schema, + max_tokens=2048, deadline=time.monotonic() + 30, max_retries=1, + purpose="worker", + worker_tools=["fetch"], session_id="rst-budget", + ) + assert errs == [], errs + assert obj["role"] == "proposer" + statuses = [c["status"] for c in ans["inner_tool_calls"]] + assert statuses == ["ok", "ok", "hop_budget_exhausted"], statuses + # Only 2 fetches actually happened. + assert len(fetched_urls) == 2, fetched_urls + + # ------------------------------------------------------------------ + # 3) worker_tools empty -> identity behavior of _request_structured + # ------------------------------------------------------------------ + bodies.clear(); fetched_urls.clear() + openai_responses[:] = [valid_proposer] + obj, ans, errs = srv._request_structured( + srv.ALL_PROVIDERS["openai"], + [{"role": "system", "content": "PROPOSER."}, + {"role": "user", "content": "Draft."}], + role_schema, + max_tokens=2048, deadline=time.monotonic() + 30, max_retries=1, + purpose="worker", + worker_tools=[], session_id="rst-empty", + ) + assert errs == [], errs + assert obj["role"] == "proposer" + # No inner_tool_calls field when worker_tools was empty. + assert "inner_tool_calls" not in ans + # And the system message did NOT receive the tool-use hint (it has + # the schema instruction but not `` syntax). + sys_content = next((m["content"] for m in bodies[0]["body"]["messages"] + if m["role"] == "system"), "") + assert "SCHEMA:" in sys_content + assert "" not in sys_content, sys_content + + # ------------------------------------------------------------------ + # 4) Schema validation retry preserved on tool-use path + # ------------------------------------------------------------------ + bodies.clear(); fetched_urls.clear() + openai_responses[:] = [ + '{"name":"fetch","args":{"url":"https://example.com/x"}}', + '{"role":"proposer"}', # missing required `summary` + `confidence` + valid_proposer, # retry-on-fail + ] + obj, ans, errs = srv._request_structured( + srv.ALL_PROVIDERS["openai"], + [{"role": "system", "content": "PROPOSER."}, + {"role": "user", "content": "Draft."}], + role_schema, + max_tokens=2048, deadline=time.monotonic() + 30, max_retries=1, + purpose="worker", + worker_tools=["fetch"], session_id="rst-retry", + ) + assert errs == [], errs + assert obj["role"] == "proposer" + # One inner tool call recorded; the retry doesn't count as a hop. + assert ans["inner_tool_calls"] == [{"hop": 1, "name": "fetch", "status": "ok"}] + + # ------------------------------------------------------------------ + # 5) End-to-end coordinate: proposer + critic fetch; synth does NOT + # ------------------------------------------------------------------ + bodies.clear(); fetched_urls.clear() + # Proposer (anthropic) — fetch then envelope + anthropic_responses[:] = [ + '{"name":"fetch","args":{"url":"https://example.com/spec"}}', + valid_proposer, + # Synth (also anthropic in this setup) — gets called LAST. Synth + # has worker_tools=[] effectively (excluded by coordinate), so it + # should NOT emit a tool_call. We give it a valid synth envelope. + valid_synth, + ] + # Critics: openai + xai. Each fetches once then emits a critic envelope. + openai_responses[:] = [ + '{"name":"fetch","args":{"url":"https://example.com/critic-openai"}}', + valid_critic, + ] + xai_responses[:] = [ + '{"name":"fetch","args":{"url":"https://example.com/critic-xai"}}', + valid_critic, + ] + + res = srv.tool_coordinate({ + "topic": "Pick an auth strategy", + "providers": ["openai", "anthropic", "xai"], + "proposer": "anthropic", + "critics": ["openai", "xai"], + "synthesizer": "anthropic", + "worker_tools": ["fetch", "verify", "coordinate", "audit"], # last two should be rejected + "session_id": "coord-tools-1", + }) + # Metadata surfaced + assert res["worker_tools"] == { + "accepted": ["fetch", "verify"], + "rejected": ["coordinate", "audit"], + "hop_budget": 2, + "applies_to": ["proposer", "critic"], + }, res["worker_tools"] + # Proposer + both critics fetched (3 inner fetches total). Synth did NOT. + assert len(fetched_urls) == 3, fetched_urls + assert "https://example.com/spec" in fetched_urls + assert "https://example.com/critic-openai" in fetched_urls + assert "https://example.com/critic-xai" in fetched_urls + + # Synth ran cleanly — the response carries a valid synthesis_structured + assert res["synthesis_structured"] is not None, res + assert res["synthesis_structured"]["consensus"] == "Use opaque tokens." + + # Inner-call records show up on the per-role answers + prop_inner = res["proposal_answer"].get("inner_tool_calls") or [] + assert any(c.get("name") == "fetch" and c.get("status") == "ok" + for c in prop_inner), prop_inner + for crit_ans in res["critique_answers"]: + ic = crit_ans.get("inner_tool_calls") or [] + assert any(c.get("name") == "fetch" and c.get("status") == "ok" + for c in ic), ic + + # Inspect the synth's wire body — it should NOT contain the + # `` syntax hint that the proposer/critics got, because + # coordinate intentionally excludes synth from worker_tools. + # Identify by purpose tag on the body's messages (synth has the + # SYNTHESIZER role marker). + synth_bodies = [] + for b in bodies: + sys_block = (b["body"].get("system") or "") + "\n".join( + (m.get("content") or "") for m in b["body"].get("messages") or [] + if isinstance(m, dict) and m.get("role") == "system" + ) + if "SYNTHESIZER" in sys_block: + synth_bodies.append(sys_block) + assert synth_bodies, "expected at least one synth-role dispatch" + assert all("" not in s for s in synth_bodies), \ + "synth system prompt should NOT carry the tool_call hint" + + # ------------------------------------------------------------------ + # 6) coordinate WITHOUT worker_tools = legacy single-shot behavior; + # no `worker_tools` metadata on the response + # ------------------------------------------------------------------ + bodies.clear(); fetched_urls.clear() + anthropic_responses[:] = [valid_proposer, valid_synth] + openai_responses[:] = [valid_critic] + xai_responses[:] = [valid_critic] + res = srv.tool_coordinate({ + "topic": "Plain coordinate, no tools", + "providers": ["openai", "anthropic", "xai"], + "proposer": "anthropic", + "critics": ["openai", "xai"], + "synthesizer": "anthropic", + "session_id": "coord-plain", + }) + assert "worker_tools" not in res, res + assert fetched_urls == [], fetched_urls + + srv.tool_fetch = real_tool_fetch + print("OK: test_coordinate_worker_tools") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/servers/python/crosscheck_server.py b/servers/python/crosscheck_server.py index 9d33d2c..6b57586 100755 --- a/servers/python/crosscheck_server.py +++ b/servers/python/crosscheck_server.py @@ -4090,6 +4090,18 @@ def tool_coordinate(args: dict) -> dict: if mem_block: topic_block = f"{mem_block}\n\n{topic_block}" + # Worker tool-use opt-in: filter to the hard allowlist. Tools are + # enabled on the PROPOSER and CRITIC roles only (these benefit from + # evidence gathering); the SYNTHESIZER is purely combinatorial and + # should not be making external calls of its own. + requested_worker_tools = (args.get("worker_tools") + if isinstance(args.get("worker_tools"), list) else []) + accepted_worker_tools = [t for t in requested_worker_tools + if isinstance(t, str) and t in _WORKER_TOOL_ALLOWLIST] + rejected_worker_tools = [t for t in requested_worker_tools + if not (isinstance(t, str) and t in _WORKER_TOOL_ALLOWLIST)] + inner_session_id = session.get("session_id") if session else args.get("session_id") + # ---- Step 1: Proposer ---------------------------------------------------- prop_system = ( sys_msg + "\n\nYou are the PROPOSER. Draft an initial position with claims and " @@ -4103,6 +4115,8 @@ def tool_coordinate(args: dict) -> dict: proposer, prop_messages, _role_turn_schema(), max_tokens=per_call, deadline=deadline, max_retries=1, purpose="worker", + worker_tools=accepted_worker_tools, + session_id=inner_session_id, ) proposal_render = _format_role_turn("proposer", proposal_obj, fallback_text=proposal_ans.get("response", "") if proposal_ans else "") @@ -4121,6 +4135,8 @@ def _critique(p: Provider) -> tuple[Provider, dict | None, dict, list[str]]: p, msgs, _role_turn_schema(), max_tokens=per_call, deadline=deadline, max_retries=1, purpose="worker", + worker_tools=accepted_worker_tools, + session_id=inner_session_id, ) return p, obj, ans, errs @@ -4278,6 +4294,13 @@ def _critique(p: Provider) -> tuple[Provider, dict | None, dict, list[str]]: } if canary_leaks: result["canary_leaks"] = canary_leaks + if requested_worker_tools: + result["worker_tools"] = { + "accepted": accepted_worker_tools, + "rejected": rejected_worker_tools, + "hop_budget": _WORKER_TOOL_HOP_BUDGET, + "applies_to": ["proposer", "critic"], # synth is excluded + } _attach_usage_block(result, all_answers, session_id=session.get("session_id") if session else None, tool_name="coordinate") @@ -9417,10 +9440,27 @@ def _extract_json(text: str) -> Any | None: def _request_structured(p: "Provider", base_messages: list[dict], schema: dict, max_tokens: int, deadline: float, max_retries: int = 1, - purpose: str = "worker" + purpose: str = "worker", + *, + worker_tools: list[str] | None = None, + session_id: str | None = None, ) -> tuple[Any | None, dict, list[str]]: """Ask provider for JSON matching `schema`. Validate; retry once on failure - with the errors fed back in the prompt. Returns (parsed_or_None, raw_answer, errors).""" + with the errors fed back in the prompt. Returns (parsed_or_None, raw_answer, errors). + + When `worker_tools` is non-empty, the worker may emit `{...} + ` envelopes before its final structured emission. Tool + calls are interleaved with the structured-output loop: each tool call + counts against the per-turn hop budget; once the worker stops emitting + tool_call tags, the response is parsed against `schema` (with the + standard retry-once-on-validation-failure semantics).""" + allowed_tools = [t for t in (worker_tools or []) if t in _WORKER_TOOL_ALLOWLIST] + if allowed_tools: + return _request_structured_with_tools( + p, base_messages, schema, + max_tokens=max_tokens, deadline=deadline, max_retries=max_retries, + purpose=purpose, worker_tools=allowed_tools, session_id=session_id, + ) sys_idx = next((i for i, m in enumerate(base_messages) if m.get("role") == "system"), None) schema_text = json.dumps(schema, separators=(",", ":")) instr = ( @@ -9457,6 +9497,152 @@ def _request_structured(p: "Provider", base_messages: list[dict], schema: dict, return None, last_answer, last_errs +def _request_structured_with_tools(p: "Provider", base_messages: list[dict], + schema: dict, *, + max_tokens: int, deadline: float, + max_retries: int, purpose: str, + worker_tools: list[str], + session_id: str | None, + ) -> tuple[Any | None, dict, list[str]]: + """Tool-call-aware variant of `_request_structured`. Interleaves the + `_ask_one_with_tools` hop loop with schema-validated emission: tool + calls (counted against the hop budget) come first; the final emission + is parsed + validated; one retry on validation failure.""" + sys_idx = next((i for i, m in enumerate(base_messages) if m.get("role") == "system"), None) + schema_text = json.dumps(schema, separators=(",", ":")) + instr = ( + "\n\nReturn ONLY a single JSON object matching this schema. " + "No commentary, no markdown fences, no prose around it.\n" + f"SCHEMA:\n{schema_text}\n\n" + "If you need supporting evidence, you may emit ONE " + "`...` envelope BEFORE the schema emission; " + "the tool result will be returned to you and you may then either " + "emit a follow-up tool_call OR your final JSON object. Tool calls " + "must NEVER appear in the final response — only the JSON object." + ) + hint = _worker_tools_system_hint(worker_tools) + + msgs = [dict(m) for m in base_messages] + if sys_idx is not None: + msgs[sys_idx]["content"] = (msgs[sys_idx].get("content") or "") + instr + hint + else: + msgs.insert(0, {"role": "system", "content": (instr + hint).strip()}) + + aggregated: dict = {} + inner_calls: list[dict] = [] + hops = 0 + attempt = 0 + last_answer: dict = {} + last_errs: list[str] = [] + + while True: + ans = _ask_one(p, msgs, deadline, max_tokens, purpose=purpose) + last_answer = ans + aggregated = _merge_answer_usage(aggregated, ans) if aggregated else dict(ans) + + if "error" in ans or not isinstance(ans.get("response"), str): + if inner_calls: + aggregated["inner_tool_calls"] = inner_calls + return None, aggregated, [ + f"provider error: {ans.get('error_kind', 'other')}: {ans.get('error', '')}" + ] + + text = ans["response"] + call, parse_err = _extract_tool_call(text) + + if call is not None or parse_err is not None: + # Worker requested a tool. Enforce hop budget before executing. + if call is None: # malformed body + inner_calls.append({"hop": hops + 1, "name": None, + "status": "parse_error", "error": parse_err}) + if hops >= _WORKER_TOOL_HOP_BUDGET: + aggregated["inner_tool_calls"] = inner_calls + return None, aggregated, [parse_err or "malformed tool_call"] + msgs = list(msgs) + [ + {"role": "assistant", "content": text}, + {"role": "user", + "content": _worker_tools_refusal( + "", parse_err or "bad tool_call", + hint="Emit valid JSON inside ..., " + "or your final schema-conforming JSON object.")}, + ] + hops += 1 + continue + + if hops >= _WORKER_TOOL_HOP_BUDGET: + inner_calls.append({"hop": hops + 1, + "name": call.get("name"), + "status": "hop_budget_exhausted"}) + msgs = list(msgs) + [ + {"role": "assistant", "content": text}, + {"role": "user", + "content": _worker_tools_refusal( + str(call.get("name") or ""), + f"tool-hop budget exceeded ({_WORKER_TOOL_HOP_BUDGET})", + hint="Emit your final JSON object NOW; no further tool calls.")}, + ] + ans2 = _ask_one(p, msgs, deadline, max_tokens, purpose=purpose) + aggregated = _merge_answer_usage(aggregated, ans2) + last_answer = ans2 + # Fall through to schema validation on the next iteration's + # response by setting `text` and continuing the parse path. + text = ans2.get("response", "") if isinstance(ans2, dict) else "" + # Don't retry on schema fail; we're already at the limit. + obj = _extract_json(text) if text else None + errs = _validate(obj, schema) if obj else ["could not parse JSON from response"] + aggregated["inner_tool_calls"] = inner_calls + return (obj if not errs else None), aggregated, errs + + tool_name = call.get("name") + result_block = _worker_tools_dispatch(call, session_id=session_id) + refused = '"refused": true' in result_block + inner_calls.append({"hop": hops + 1, + "name": tool_name, + "status": "refused" if refused else "ok"}) + _emit_progress( + f"{p.name}: worker_tool '{tool_name}' (structured) " + f"hop={hops+1}/{_WORKER_TOOL_HOP_BUDGET} " + f"({'refused' if refused else 'ok'})", + provider=p.name, model=p.model, purpose=purpose, + worker_tool=tool_name, hop=hops + 1, + status="refused" if refused else "ok", + ) + msgs = list(msgs) + [ + {"role": "assistant", "content": text}, + {"role": "user", "content": result_block}, + ] + hops += 1 + continue + + # No tool_call in the response: this is the worker's final emission. + # Parse + validate against the schema; retry once on failure. + obj = _extract_json(text) + if obj is None: + last_errs = ["could not parse JSON from response"] + else: + errs = _validate(obj, schema) + if not errs: + if inner_calls: + aggregated["inner_tool_calls"] = inner_calls + return obj, aggregated, [] + last_errs = errs + + if attempt >= max_retries: + if inner_calls: + aggregated["inner_tool_calls"] = inner_calls + return None, aggregated, last_errs + # One re-prompt with validation feedback. + msgs = list(msgs) + [ + {"role": "assistant", "content": text}, + {"role": "user", + "content": "Your previous response failed validation:\n- " + + "\n- ".join(last_errs[:5]) + + "\nFix the issues and re-emit valid JSON only — " + "no more tool_call envelopes."}, + ] + attempt += 1 + + def _validate_input(name: str, schema: dict, args: dict) -> str | None: """Stdlib-only input check at the boundary. Returns None on success.""" if schema.get("type") != "object" or not isinstance(args, dict):