From 96ff86e37963ed2e632424db4c005a35bda11583 Mon Sep 17 00:00:00 2001 From: Frank Speiser Date: Tue, 26 May 2026 11:27:57 -0400 Subject: [PATCH] Pre-validate inner tool args at the worker-tool gateway MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the PR #21 follow-up. Today the gateway only enforces the allowlist; bad args fell through to the inner tool which returned its own ad-hoc error envelope. Workers got inconsistent refusal shapes across rejection reasons (allowlist vs. schema vs. hop budget vs. cost cap) and the inner tool burned code paths handling garbage. The fix: - Look up the inner tool's `inputSchema` from `TOOLS[name]` and run the existing boundary validator `_validate_input` BEFORE dispatch. - On validation failure, return a refusal payload carrying a structured `schema_error` field with the validator message — the worker can read it programmatically and self-correct on the next hop (within the remaining hop budget). - Emit a `worker_inner_validation_fail` ndjson event so operators can see when a worker is repeatedly misusing inner tools. Validation order is: allowlist -> input schema -> session_id injection -> dispatch. session_id is added AFTER the schema check so a missing session_id arg never trips the validator (it's always optional on the inner tools). API change: - `_worker_tools_refusal` grows an optional `schema_error` keyword argument; the payload gains a `schema_error` field when set. - New helper `_worker_tool_input_schema(name)` for the lookup (defensive against module-load order: returns empty dict if TOOLS isn't bound yet). Tests (scripts/test_worker_tool_arg_validation.py): - `fetch` with no `url`: refusal with schema_error mentioning the missing field - `fetch.url` wrong type: refusal mentioning type + field - `verify` missing `checks`: refusal - `verify.checks` wrong type: refusal mentioning array - Unknown extra arg on `fetch` (additionalProperties=false): refusal - VALID `fetch` args dispatch normally with session_id injected - VALID `verify` args dispatch normally - Allowlist refusal still works AND has no `schema_error` field (it short-circuits before the schema check) - End-to-end recovery: worker emits invalid call, gets refusal with schema_error, recovers on next hop with valid args; only the valid call reaches the inner tool Full suite (37 scripts) passes. Co-Authored-By: Claude Opus 4.7 --- scripts/test_worker_tool_arg_validation.py | 230 +++++++++++++++++++++ servers/python/crosscheck_server.py | 49 ++++- 2 files changed, 275 insertions(+), 4 deletions(-) create mode 100644 scripts/test_worker_tool_arg_validation.py diff --git a/scripts/test_worker_tool_arg_validation.py b/scripts/test_worker_tool_arg_validation.py new file mode 100644 index 0000000..abae25a --- /dev/null +++ b/scripts/test_worker_tool_arg_validation.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +"""Tests for gateway-level input-schema validation of worker inner-tool +calls. + +Today the gateway only checks the allowlist; bad args fall through to +the inner tool which returns its own ad-hoc error envelope. The fix: +validate args against the inner tool's `inputSchema` at the gateway +BEFORE dispatch, and surface a structured refusal (`schema_error` +field on the refusal payload) so the worker can self-correct. +""" + +from __future__ import annotations + +import json +import os +import sys +import tempfile +from pathlib import Path + + +def main() -> int: + here = Path(__file__).resolve().parents[1] + sys.path.insert(0, str(here / "servers" / "python")) + + tmp = Path(tempfile.mkdtemp()) + pricing = tmp / "pricing.json" + pricing.write_text(json.dumps({ + "openai": {"gpt-test": {"prompt_per_1k": 0.0001, "completion_per_1k": 0.0003, "cached_per_1k": 0.00005}}, + "anthropic": {"claude-test": {"prompt_per_1k": 0.003, "completion_per_1k": 0.015, "cached_per_1k": 0.0003}}, + })) + os.environ["CROSSCHECK_PRICING_PATH"] = str(pricing) + os.environ.pop("CROSSCHECK_REJECT_CONFIG_DRIFT", None) + + import crosscheck_server as srv + + srv.CFG = dict(srv.CFG) + srv.CFG["session_db"] = str(tmp / "sessions.db") + srv.CFG["transcript_dir"] = str(tmp / "transcripts") + srv.CFG["cache"] = {"enabled": False} + srv.CFG["node_cache"] = {"enabled": False} + srv.CFG["prompt_adapters"] = {"enabled": False} + srv.CFG["fetch"] = {"url_allowlist": ["https://example.com/"]} + srv.CFG["config_pinning"] = {"reject_drift": False} + srv.TRANSCRIPT_DIR = Path(srv.CFG["transcript_dir"]) + srv._DB_INIT_DONE = False + srv._FTS5_AVAILABLE = None + srv._PRICING_CACHE = None + srv.PRICING_PATH = pricing + srv._CONFIG_PIN_STARTUP_DONE = True + + srv.ENV = dict(srv.ENV) + srv.ENV["OPENAI_API_KEY"] = "stub"; srv.ENV["OPENAI_MODEL"] = "gpt-test" + srv.ENV["ANTHROPIC_API_KEY"] = "stub"; srv.ENV["ANTHROPIC_MODEL"] = "claude-test" + srv.ALL_PROVIDERS = srv.build_providers() + srv.CFG["providers"] = ["openai", "anthropic"] + + # Track whether the inner tools were actually called: we should NEVER + # reach them with invalid args, because the gateway short-circuits. + fetch_calls: list[dict] = [] + verify_calls: list[dict] = [] + real_fetch = srv.tool_fetch + real_verify = srv.tool_verify + + def fake_fetch(args): + fetch_calls.append(dict(args)) + return {"tool": "fetch", "url": args.get("url"), + "status": "ok", "content_excerpt": "ok"} + + def fake_verify(args): + verify_calls.append(dict(args)) + return {"tool": "verify", "all_passed": True, + "checks_run": len(args.get("checks") or []), + "results": [], "summary": "ok"} + + srv.tool_fetch = fake_fetch + srv.tool_verify = fake_verify + + # ------------------------------------------------------------------ + # 1) Missing required field on `fetch` (no `url`) -> refusal + # ------------------------------------------------------------------ + res = srv._worker_tools_dispatch({"name": "fetch", "args": {}}, + session_id="sess-1") + assert '"refused": true' in res, res + payload = json.loads(res.split("")[1].split("")[0].strip()) + assert payload["tool"] == "fetch" + assert payload.get("schema_error"), payload + assert "url" in payload["schema_error"].lower(), payload["schema_error"] + assert fetch_calls == [], "inner fetch must NOT be called when validation fails" + + # ------------------------------------------------------------------ + # 2) Wrong type on `fetch.url` (integer not string) -> refusal + # ------------------------------------------------------------------ + res = srv._worker_tools_dispatch({"name": "fetch", "args": {"url": 12345}}, + session_id="sess-2") + assert '"refused": true' in res + payload = json.loads(res.split("")[1].split("")[0].strip()) + assert payload.get("schema_error"), payload + assert "url" in payload["schema_error"].lower() and "string" in payload["schema_error"].lower(), payload["schema_error"] + assert fetch_calls == [] + + # ------------------------------------------------------------------ + # 3) `verify` missing required `checks` -> refusal + # ------------------------------------------------------------------ + res = srv._worker_tools_dispatch({"name": "verify", "args": {}}, + session_id="sess-3") + assert '"refused": true' in res + payload = json.loads(res.split("")[1].split("")[0].strip()) + assert payload["tool"] == "verify" + assert "checks" in payload.get("schema_error", "").lower(), payload + assert verify_calls == [] + + # ------------------------------------------------------------------ + # 4) `verify.checks` not a list -> refusal + # ------------------------------------------------------------------ + res = srv._worker_tools_dispatch( + {"name": "verify", "args": {"checks": "not-a-list"}}, + session_id="sess-4") + assert '"refused": true' in res + payload = json.loads(res.split("")[1].split("")[0].strip()) + assert "checks" in payload.get("schema_error", "").lower(), payload + assert "array" in payload["schema_error"].lower(), payload["schema_error"] + assert verify_calls == [] + + # ------------------------------------------------------------------ + # 5) Unknown argument on `fetch` -> refusal (additionalProperties=false) + # ------------------------------------------------------------------ + res = srv._worker_tools_dispatch( + {"name": "fetch", "args": {"url": "https://example.com/", "evil_arg": 1}}, + session_id="sess-5") + # Refusal expected because the fetch schema sets additionalProperties=false. + # If the schema doesn't (some shapes don't), we relax to "either pass or refuse". + schema = srv._worker_tool_input_schema("fetch") + if schema.get("additionalProperties") is False: + assert '"refused": true' in res, res + assert fetch_calls == [] # never reached the inner tool + else: + # Schema is permissive on extras — gateway should let it through. + assert '"refused": true' not in res + + # ------------------------------------------------------------------ + # 6) VALID args on fetch still dispatch correctly (regression) + # ------------------------------------------------------------------ + fetch_calls.clear() + res = srv._worker_tools_dispatch( + {"name": "fetch", "args": {"url": "https://example.com/spec"}}, + session_id="sess-ok") + assert '"refused": true' not in res, res + assert len(fetch_calls) == 1 + assert fetch_calls[0]["url"] == "https://example.com/spec" + # session_id was injected + assert fetch_calls[0].get("session_id") == "sess-ok" + + # ------------------------------------------------------------------ + # 7) VALID args on verify still dispatch (regression) + # ------------------------------------------------------------------ + verify_calls.clear() + res = srv._worker_tools_dispatch( + {"name": "verify", + "args": {"checks": [{"kind": "contains", + "id": "x", + "target_text": "hi", + "value": "hi"}]}}, + session_id="sess-ok-2") + assert '"refused": true' not in res, res + assert len(verify_calls) == 1 + + # ------------------------------------------------------------------ + # 8) Allowlist denial still works (regression — checked BEFORE schema) + # ------------------------------------------------------------------ + res = srv._worker_tools_dispatch( + {"name": "coordinate", "args": {"topic": "evil"}}, + session_id="sess-deny") + payload = json.loads(res.split("")[1].split("")[0].strip()) + assert payload["refused"] is True + # No schema_error on an allowlist refusal — the gateway never even + # got to the schema check. + assert "schema_error" not in payload, payload + + # ------------------------------------------------------------------ + # 9) End-to-end: worker emits invalid call, gets refusal with + # schema_error, recovers on next hop + # ------------------------------------------------------------------ + openai_responses: list[str] = [] + bodies: list[dict] = [] + def fake_post(url, h, b, **kw): + bodies.append({"url": url, "body": b}) + if "openai.com" in url: + text = openai_responses.pop(0) if openai_responses else "DONE." + return ({"choices": [{"message": {"content": text}}], + "usage": {"prompt_tokens": 30, "completion_tokens": 10}}, 1) + return ({}, 1) + srv._http_post_resilient = fake_post + + fetch_calls.clear() + openai_responses[:] = [ + # First emission: invalid (missing url) + '{"name":"fetch","args":{}}', + # Second emission: valid + '{"name":"fetch","args":{"url":"https://example.com/x"}}', + "Recovered with the data.", + ] + ans = srv._ask_one_with_tools( + srv.ALL_PROVIDERS["openai"], + [{"role": "user", "content": "Try again."}], + deadline=__import__("time").monotonic() + 30, + max_tokens=2048, purpose="worker", + worker_tools=["fetch"], session_id="recover", + ) + statuses = [c["status"] for c in ans["inner_tool_calls"]] + # Both calls counted as "refused" / "ok" but only one real fetch fired. + assert statuses == ["refused", "ok"], statuses + assert len(fetch_calls) == 1, fetch_calls + assert "Recovered" in ans["response"] + # Verify the second-turn re-prompt embedded the schema_error so the + # worker had something concrete to fix. + user_msgs = [] + for b in bodies: + user_msgs.extend(m.get("content", "") for m in b["body"]["messages"] + if m["role"] == "user") + refusal_msgs = [m for m in user_msgs if "schema_error" in m] + assert refusal_msgs, "expected the schema_error refusal to be re-prompted" + + srv.tool_fetch = real_fetch + srv.tool_verify = real_verify + print("OK: test_worker_tool_arg_validation") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/servers/python/crosscheck_server.py b/servers/python/crosscheck_server.py index b7ed356..9cec871 100755 --- a/servers/python/crosscheck_server.py +++ b/servers/python/crosscheck_server.py @@ -2818,19 +2818,40 @@ def _wrap_tool_result(name: str, content: str) -> str: ) -def _worker_tools_refusal(name: str, reason: str, hint: str = "") -> str: +def _worker_tools_refusal(name: str, reason: str, hint: str = "", + *, schema_error: str | None = None) -> str: """Structured refusal payload — re-prompted to the worker as a tool_result - so it can incorporate the failure into its next emission.""" - payload = {"refused": True, "tool": name, "reason": reason} + so it can incorporate the failure into its next emission. When the + refusal stems from gateway-level schema validation, `schema_error` + carries the underlying validator message so the worker can correct + the args on the next hop.""" + payload: dict = {"refused": True, "tool": name, "reason": reason} if hint: payload["operator_hint"] = hint + if schema_error: + payload["schema_error"] = schema_error return _wrap_tool_result(name or "", json.dumps(payload)) +def _worker_tool_input_schema(name: str) -> dict: + """Look up the JSON-schema for the named inner tool's input. Returns an + empty dict (no validation) when TOOLS hasn't been loaded yet — defensive, + `TOOLS` is initialised at module-load before any tool call can happen.""" + try: + return (TOOLS.get(name) or {}).get("inputSchema") or {} + except NameError: + return {} + + def _worker_tools_dispatch(call: dict, *, session_id: str | None) -> str: """Execute one inner tool call. Returns the wrapped string ready for re-prompt. Refusals are also wrapped so the worker sees a - coherent shape regardless of outcome.""" + coherent shape regardless of outcome. + + Validation order: allowlist check, then input-schema check, then + session_id injection, then inner-tool dispatch. The schema check at + the gateway gives the worker a clean structured refusal instead of + whatever ad-hoc error envelope the inner tool happens to produce.""" name = str(call.get("name", "")).strip() args = call.get("args") if isinstance(call.get("args"), dict) else {} if name not in _WORKER_TOOL_ALLOWLIST: @@ -2839,6 +2860,26 @@ def _worker_tools_dispatch(call: dict, *, session_id: str | None) -> str: hint=f"Allowed inner tools: {sorted(_WORKER_TOOL_ALLOWLIST)}", ) + # Pre-validate args against the inner tool's input schema BEFORE we + # touch the tool itself. session_id is injected later (after this + # check) and is always optional on the inner tools, so a missing + # session_id never trips the validator here. + schema = _worker_tool_input_schema(name) + if schema: + validate_err = _validate_input(name, schema, args) + if validate_err: + _emit_event("worker_inner_validation_fail", + tool=name, session_id=session_id, + args_keys=sorted(args.keys()) if isinstance(args, dict) else [], + schema_error=validate_err) + return _worker_tools_refusal( + name, + f"input args failed schema validation for tool {name!r}", + hint=("Inspect the tool's inputSchema and fix the args; " + "required fields, types, and enums must match."), + schema_error=validate_err, + ) + # Ensure inner calls roll up under the same session_id (for cost, # egress budget, and breakers). inner_args = dict(args)