diff --git a/schema/tools.schema.json b/schema/tools.schema.json index cde73c6..806fcc6 100644 --- a/schema/tools.schema.json +++ b/schema/tools.schema.json @@ -1248,6 +1248,44 @@ "required": ["tool"] } }, + "config_pin": { + "description": "Hash-pin the canonical config files (crosscheck.config.json + config/pricing.json by default) and detect drift. Threat model: protect against accidental edits of cost tables or panel configs. Default behavior is DETECT-AND-WARN — never blocks tool calls unless `config_pinning.reject_drift` is true OR CROSSCHECK_REJECT_CONFIG_DRIFT=1. When reject mode is on and drift is present, every tool except `config_pin` itself returns a CONFIG_PIN_DRIFT_BLOCKED error until drift is accepted.", + "input": { + "type": "object", + "additionalProperties": false, + "properties": { + "action": { "type": "string", "enum": ["show", "set", "accept_drift", "clear"], + "description": "show: compute drift; set: record current as canonical; accept_drift: refresh the pin only when drift is present (semantic guardrail); clear: remove the pin file." } + }, + "required": ["action"] + }, + "output": { + "type": "object", + "properties": { + "tool": { "const": "config_pin" }, + "action": { "type": "string" }, + "pin_file": { "type": "string" }, + "pinned": { "type": "object", "additionalProperties": { "type": "string" } }, + "current": { "type": "object", "additionalProperties": { "type": "string" } }, + "drift": { "type": "array", "items": { "type": "string" } }, + "missing_from_pin": { "type": "array", "items": { "type": "string" }, + "description": "Paths present on disk but absent from the pin file." }, + "missing_files": { "type": "array", "items": { "type": "string" }, + "description": "Paths in the pin file that no longer exist on disk." }, + "has_pin_file": { "type": "boolean" }, + "pinned_at": { "type": "integer", "minimum": 0, + "description": "Unix-seconds when the pin file was last written." }, + "reject_drift_enabled": { "type": "boolean", + "description": "True when reject mode is active via CFG.config_pinning.reject_drift or CROSSCHECK_REJECT_CONFIG_DRIFT=1." }, + "would_block": { "type": "boolean", + "description": "True when the current state would cause non-config_pin tool calls to be refused." }, + "pins": { "type": "object", "additionalProperties": { "type": "string" } }, + "accepted_drift": { "type": "array", "items": { "type": "string" } }, + "existed": { "type": "boolean", "description": "For action=clear: whether the pin file existed." } + }, + "required": ["tool"] + } + }, "recall": { "description": "Full-text search across persisted transcripts via SQLite FTS5. Surfaces matching session_id / tool / snippet / score so a caller can pull prior context into a new session without re-running expensive panel calls. The index is populated on every `write_transcript` call; results are ranked by bm25 (lower = better) and a windowed snippet shows the match.", "input": { diff --git a/scripts/test_config_pin.py b/scripts/test_config_pin.py new file mode 100644 index 0000000..f5a3e1b --- /dev/null +++ b/scripts/test_config_pin.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +"""Tests for config-file pinning. + +Covers: + - show: reports has_pin_file=false when no pin exists, lists current hashes + - set: writes the pin file with current hashes + - show after set: drift=[], has_pin_file=true + - Modify a tracked file -> show reports drift + - accept_drift: requires drift to exist, then re-pins + - clear: removes the pin file + - reject_drift gate: when reject_drift=true and drift exists, + the handler's tool dispatch refuses non-config_pin calls + - config_pin itself is NOT blocked even under reject mode + - Error taxonomy: bad action, accept_drift with no drift, accept_drift + with no pin file +""" + +from __future__ import annotations + +import json +import os +import sys +import tempfile +from pathlib import Path + + +def main() -> int: + here = Path(__file__).resolve().parents[1] + sys.path.insert(0, str(here / "servers" / "python")) + + tmp = Path(tempfile.mkdtemp()) + # Tracked files live INSIDE tmp so the test doesn't touch the real repo. + cfg_file = tmp / "crosscheck.config.json" + pricing_file = tmp / "config" / "pricing.json" + pricing_file.parent.mkdir(parents=True, exist_ok=True) + cfg_file.write_text(json.dumps({ + "providers": ["anthropic", "openai"], "moderator": "anthropic", + "max_rounds": 3, "token_cap": 8000, "max_time_seconds": 120, + })) + pricing_file.write_text(json.dumps({ + "openai": {"gpt-test": {"prompt_per_1k": 0.0001, "completion_per_1k": 0.0003, "cached_per_1k": 0.00005}}, + "anthropic": {"claude-test": {"prompt_per_1k": 0.003, "completion_per_1k": 0.015, "cached_per_1k": 0.0003}}, + })) + os.environ["CROSSCHECK_PRICING_PATH"] = str(pricing_file) + # Make sure the reject-drift env var is clean across test runs. + os.environ.pop("CROSSCHECK_REJECT_CONFIG_DRIFT", None) + + import crosscheck_server as srv + + # Re-root so the pin helpers point at our tmp tree. + srv.ROOT = tmp + srv.CFG = dict(srv.CFG) + srv.CFG["session_db"] = str(tmp / "sessions.db") + srv.CFG["transcript_dir"] = str(tmp / "transcripts") + srv.CFG["cache"] = {"enabled": False} + srv.CFG["node_cache"] = {"enabled": False} + srv.CFG["config_pinning"] = { + "paths": ["crosscheck.config.json", "config/pricing.json"], + "pin_file": ".crosscheck/config_pins.json", + "reject_drift": False, + } + srv.TRANSCRIPT_DIR = Path(srv.CFG["transcript_dir"]) + srv._DB_INIT_DONE = False + srv._FTS5_AVAILABLE = None + srv._PRICING_CACHE = None + srv.PRICING_PATH = pricing_file + srv._CONFIG_PIN_STARTUP_DONE = False + + # ------------------------------------------------------------------ + # 1) show: no pin file yet + # ------------------------------------------------------------------ + res = srv.tool_config_pin({"action": "show"}) + assert res["has_pin_file"] is False, res + assert res["pinned"] == {}, res + # current includes both tracked files with sha256: prefix + assert "crosscheck.config.json" in res["current"], res + assert "config/pricing.json" in res["current"], res + assert all(h.startswith("sha256:") for h in res["current"].values()), res + assert res["drift"] == [] and res["missing_files"] == [] + assert res["would_block"] is False + + # ------------------------------------------------------------------ + # 2) set: record the canonical pin + # ------------------------------------------------------------------ + res = srv.tool_config_pin({"action": "set"}) + assert res["action"] == "set" + assert "crosscheck.config.json" in res["pins"] + pin_path = tmp / ".crosscheck" / "config_pins.json" + assert pin_path.exists(), pin_path + doc = json.loads(pin_path.read_text()) + assert doc["version"] == 1 + assert "pins" in doc and len(doc["pins"]) == 2 + + # ------------------------------------------------------------------ + # 3) show after set: drift=[], has_pin_file=true + # ------------------------------------------------------------------ + res = srv.tool_config_pin({"action": "show"}) + assert res["has_pin_file"] is True + assert res["drift"] == [], res + assert res["would_block"] is False + + # ------------------------------------------------------------------ + # 4) Modify a tracked file -> drift detected + # ------------------------------------------------------------------ + pricing_file.write_text(json.dumps({ + # Same shape, different values (could be silent cost inflation). + "openai": {"gpt-test": {"prompt_per_1k": 999.0, "completion_per_1k": 999.0, "cached_per_1k": 0.00005}}, + "anthropic": {"claude-test": {"prompt_per_1k": 0.003, "completion_per_1k": 0.015, "cached_per_1k": 0.0003}}, + })) + res = srv.tool_config_pin({"action": "show"}) + assert res["drift"] == ["config/pricing.json"], res + + # ------------------------------------------------------------------ + # 5) accept_drift refreshes the pin + # ------------------------------------------------------------------ + res = srv.tool_config_pin({"action": "accept_drift"}) + assert res["action"] == "accept_drift" + assert res["accepted_drift"] == ["config/pricing.json"], res + # After acceptance, drift is gone. + res = srv.tool_config_pin({"action": "show"}) + assert res["drift"] == [], res + + # ------------------------------------------------------------------ + # 6) accept_drift with NO drift returns CONFIG_PIN_NO_DRIFT + # ------------------------------------------------------------------ + res = srv.tool_config_pin({"action": "accept_drift"}) + assert res.get("error_code") == "CONFIG_PIN_NO_DRIFT", res + + # ------------------------------------------------------------------ + # 7) clear removes the pin file + # ------------------------------------------------------------------ + res = srv.tool_config_pin({"action": "clear"}) + assert res["existed"] is True + assert not pin_path.exists() + # accept_drift with NO pin file returns CONFIG_PIN_NO_PIN_FILE + res = srv.tool_config_pin({"action": "accept_drift"}) + assert res.get("error_code") == "CONFIG_PIN_NO_PIN_FILE", res + + # ------------------------------------------------------------------ + # 8) Bad action error taxonomy + # ------------------------------------------------------------------ + res = srv.tool_config_pin({"action": "nope"}) + assert res.get("error_code") == "CONFIG_PIN_BAD_ACTION", res + + # ------------------------------------------------------------------ + # 9) Reject mode gate: drift blocks non-config_pin tool calls + # ------------------------------------------------------------------ + # Re-pin clean tree, then enable reject mode and introduce drift. + srv.tool_config_pin({"action": "set"}) + srv.CFG["config_pinning"] = dict(srv.CFG["config_pinning"]) + srv.CFG["config_pinning"]["reject_drift"] = True + + # No drift yet -> would_block=False + assert srv._config_pin_should_block() is False + + # Drift the pricing file again + pricing_file.write_text(json.dumps({ + "openai": {"gpt-test": {"prompt_per_1k": 0.0002, "completion_per_1k": 0.0003, "cached_per_1k": 0.00005}}, + "anthropic": {"claude-test": {"prompt_per_1k": 0.003, "completion_per_1k": 0.015, "cached_per_1k": 0.0003}}, + })) + assert srv._config_pin_should_block() is True + + # Dispatch a tool call via handle(); should be blocked. + blocked = srv.handle({"jsonrpc": "2.0", "id": 1, "method": "tools/call", + "params": {"name": "verify", + "arguments": {"checks": [{"kind": "contains", + "id": "x", + "target_text": "hi", + "value": "hi"}]}}}) + body = json.loads(blocked["result"]["content"][0]["text"]) + assert body.get("error_code") == "CONFIG_PIN_DRIFT_BLOCKED", body + assert "config/pricing.json" in body["drift"] + + # config_pin itself is NOT blocked + allowed = srv.handle({"jsonrpc": "2.0", "id": 2, "method": "tools/call", + "params": {"name": "config_pin", + "arguments": {"action": "show"}}}) + body = json.loads(allowed["result"]["content"][0]["text"]) + assert body.get("error_code") is None, body + assert body["tool"] == "config_pin" + assert body["would_block"] is True + + # Accept the drift via the handler -> tool calls unblock + allowed = srv.handle({"jsonrpc": "2.0", "id": 3, "method": "tools/call", + "params": {"name": "config_pin", + "arguments": {"action": "accept_drift"}}}) + body = json.loads(allowed["result"]["content"][0]["text"]) + assert body["action"] == "accept_drift", body + + # Now verify is no longer blocked + ok = srv.handle({"jsonrpc": "2.0", "id": 4, "method": "tools/call", + "params": {"name": "verify", + "arguments": {"checks": [{"kind": "contains", + "id": "x", + "target_text": "hi", + "value": "hi"}]}}}) + body = json.loads(ok["result"]["content"][0]["text"]) + assert body["tool"] == "verify" and body["all_passed"] is True, body + + # ------------------------------------------------------------------ + # 10) Env-var bypass: CROSSCHECK_REJECT_CONFIG_DRIFT=1 enables reject + # mode even when CFG.config_pinning.reject_drift is false. + # ------------------------------------------------------------------ + srv.CFG["config_pinning"]["reject_drift"] = False + assert srv._config_pin_should_block() is False + # Re-introduce drift, then flip env var. + pricing_file.write_text(json.dumps({ + "openai": {"gpt-test": {"prompt_per_1k": 0.999, "completion_per_1k": 0.999, "cached_per_1k": 0.0001}}, + })) + os.environ["CROSSCHECK_REJECT_CONFIG_DRIFT"] = "1" + assert srv._config_pin_should_block() is True + os.environ.pop("CROSSCHECK_REJECT_CONFIG_DRIFT", None) + assert srv._config_pin_should_block() is False + + # ------------------------------------------------------------------ + # 11) Cross-platform CRLF normalization: pinning a file then writing + # the same content with CRLF endings doesn't trip drift. + # ------------------------------------------------------------------ + srv.tool_config_pin({"action": "set"}) + # Rewrite pricing file with CRLF endings, same logical content. + original = pricing_file.read_text() + pricing_file.write_bytes(original.replace("\n", "\r\n").encode("utf-8")) + res = srv.tool_config_pin({"action": "show"}) + assert res["drift"] == [], res + + print("OK: test_config_pin") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/servers/python/crosscheck_server.py b/servers/python/crosscheck_server.py index a0ffa86..9d33d2c 100755 --- a/servers/python/crosscheck_server.py +++ b/servers/python/crosscheck_server.py @@ -71,6 +71,149 @@ def _resolve_transcript_dir(cfg: dict[str, Any]) -> Path: TRANSCRIPT_DIR = _resolve_transcript_dir(CFG) +# ------------------------------------------------------------ +# Config pinning +# +# Hash-pin the canonical config files (crosscheck.config.json + the +# pricing table) on first observation; refuse to serve tool calls if +# the hashes drift away from the pinned values UNLESS the operator +# explicitly accepts the drift via the `config_pin` tool or via the +# CROSSCHECK_REJECT_CONFIG_DRIFT bypass. +# +# Threat model is operator-set: this protects against accidental edits +# of cost tables or panel configs (e.g. someone setting a model's cost +# to 0 to hide spend, or adding an unintended provider to the active +# set). Premature for current adversarial threat models — defaults to +# DETECT-AND-WARN, never blocks unless explicitly enabled. +# ------------------------------------------------------------ +def _config_pin_path() -> Path: + raw = (CFG.get("config_pinning") or {}).get("pin_file") \ + or ".crosscheck/config_pins.json" + p = Path(str(raw)) + return p if p.is_absolute() else (ROOT / p) + + +def _config_pin_targets() -> list[Path]: + """Files included in the pin set. Operator can extend via + CFG.config_pinning.paths (list of repo-relative or absolute paths); + missing files are silently dropped (a non-existent panel.json isn't + a drift signal).""" + raw = (CFG.get("config_pinning") or {}).get("paths") + if isinstance(raw, list) and raw: + candidates = [Path(str(p)) for p in raw] + else: + candidates = [Path("crosscheck.config.json"), Path("config/pricing.json")] + out: list[Path] = [] + for p in candidates: + absp = p if p.is_absolute() else (ROOT / p) + if absp.exists() and absp.is_file(): + out.append(absp) + return out + + +def _config_pin_hash_file(path: Path) -> str: + """SHA256 over file contents, normalized to LF line endings so a + cross-platform checkout doesn't trip the drift detector.""" + try: + data = path.read_bytes().replace(b"\r\n", b"\n") + except OSError: + return "" + return "sha256:" + hashlib.sha256(data).hexdigest() + + +def _config_pin_rel(path: Path) -> str: + """Stable identifier inside the pin file — repo-relative when possible.""" + try: + return str(path.relative_to(ROOT)) + except ValueError: + return str(path) + + +def _config_pin_load() -> dict: + p = _config_pin_path() + if not p.exists(): + return {} + try: + doc = json.loads(p.read_text()) + if isinstance(doc, dict): + return doc + except Exception: + pass + return {} + + +def _config_pin_save(doc: dict) -> Path: + p = _config_pin_path() + p.parent.mkdir(parents=True, exist_ok=True) + _atomic_write_json(p, doc) + return p + + +def _config_pin_current() -> dict[str, str]: + """Map of repo-relative path -> current sha256.""" + return {_config_pin_rel(p): _config_pin_hash_file(p) + for p in _config_pin_targets()} + + +def _config_pin_drift() -> dict: + """Compute (pinned, current, drift, missing_from_pin, missing_files). + Returns the structured envelope that both `config_pin` and the + startup gate consume.""" + doc = _config_pin_load() + pinned: dict[str, str] = (doc.get("pins") or {}) if isinstance(doc, dict) else {} + current = _config_pin_current() + drift = [path for path, h in pinned.items() + if current.get(path) and current[path] != h] + missing_from_pin = [p for p in current.keys() if p not in pinned] + missing_files = [p for p in pinned.keys() if p not in current] + return {"pinned": pinned, + "current": current, + "drift": sorted(drift), + "missing_from_pin": sorted(missing_from_pin), + "missing_files": sorted(missing_files), + "pin_file": _config_pin_rel(_config_pin_path()), + "pinned_at": int(doc.get("pinned_at") or 0) + if isinstance(doc, dict) else 0, + "has_pin_file": _config_pin_path().exists()} + + +def _config_pin_should_block() -> bool: + """Reject mode: returns True when (a) a pin file exists, (b) drift + is present, AND (c) reject_drift is enabled via CFG or env.""" + if os.environ.get("CROSSCHECK_REJECT_CONFIG_DRIFT") == "1": + reject = True + else: + reject = bool((CFG.get("config_pinning") or {}).get("reject_drift", False)) + if not reject: + return False + drift = _config_pin_drift() + if not drift["has_pin_file"]: + return False + return bool(drift["drift"]) + + +_CONFIG_PIN_STARTUP_DONE = False + + +def _config_pin_startup_check() -> None: + """Lazy one-shot emission of the pin status. Runs on first tool call. + Never blocks — `_config_pin_should_block` is the gate that callers + consult separately.""" + global _CONFIG_PIN_STARTUP_DONE + if _CONFIG_PIN_STARTUP_DONE: + return + _CONFIG_PIN_STARTUP_DONE = True + try: + drift = _config_pin_drift() + _emit_event("config_pin_check", + has_pin_file=drift["has_pin_file"], + drift=drift["drift"], + missing_from_pin=drift["missing_from_pin"], + missing_files=drift["missing_files"]) + except Exception: + pass + + # ------------------------------------------------------------ # Redaction (PII / secrets — applied to traces and transcripts) # ------------------------------------------------------------ @@ -8939,6 +9082,110 @@ def tool_session_memory(args: dict) -> dict: "session_id": session_id, "deleted": n} +def tool_config_pin(args: dict) -> dict: + """CRUD over the config-pinning ledger. + + Actions: + show — compute current vs. pinned hashes; surface any drift + set — record current hashes as the canonical pin (first + time setup, OR overwrite the previous pin) + accept_drift — same as `set` but only succeeds when drift is + currently present (semantic guardrail to avoid + accidentally re-pinning a clean tree) + clear — remove the pin file entirely (next call sees + has_pin_file=false) + """ + action = args.get("action") + if action not in ("show", "set", "accept_drift", "clear"): + return {"tool": "config_pin", + **_error("CONFIG_PIN_BAD_ACTION", + f"unknown action {action!r}", + hint="action must be one of: show, set, accept_drift, clear.")} + + drift = _config_pin_drift() + + if action == "show": + return {"tool": "config_pin", "action": "show", **drift, + "reject_drift_enabled": ( + os.environ.get("CROSSCHECK_REJECT_CONFIG_DRIFT") == "1" + or bool((CFG.get("config_pinning") or {}).get("reject_drift", False)) + ), + "would_block": _config_pin_should_block()} + + if action == "set": + doc = {"version": 1, + "pinned_at": int(time.time()), + "pins": _config_pin_current()} + path = _config_pin_save(doc) + _emit_event("config_pin_set", pin_file=_config_pin_rel(path), + count=len(doc["pins"])) + return {"tool": "config_pin", "action": "set", + "pin_file": _config_pin_rel(path), + "pins": doc["pins"], "pinned_at": doc["pinned_at"]} + + if action == "accept_drift": + if not drift["has_pin_file"]: + return {"tool": "config_pin", + **_error("CONFIG_PIN_NO_PIN_FILE", + "no pin file exists yet — nothing to accept", + hint="Use action='set' to record the current " + "hashes as the canonical pin.")} + if not drift["drift"]: + return {"tool": "config_pin", + **_error("CONFIG_PIN_NO_DRIFT", + "current files match the existing pin; " + "nothing to accept", + hint="If you intended to refresh the pin anyway, " + "use action='set'.")} + doc = {"version": 1, + "pinned_at": int(time.time()), + "pins": _config_pin_current(), + "previous_pinned_at": drift["pinned_at"], + "accepted_drift": drift["drift"]} + path = _config_pin_save(doc) + _emit_event("config_pin_accept_drift", + pin_file=_config_pin_rel(path), + accepted_paths=drift["drift"]) + return {"tool": "config_pin", "action": "accept_drift", + "pin_file": _config_pin_rel(path), + "accepted_drift": drift["drift"], + "pins": doc["pins"], "pinned_at": doc["pinned_at"]} + + # action == "clear" + path = _config_pin_path() + existed = path.exists() + if existed: + try: + path.unlink() + except OSError as e: + return {"tool": "config_pin", + **_error("CONFIG_PIN_CLEAR_FAILED", str(e), + hint="Check filesystem permissions on the pin file.")} + _emit_event("config_pin_clear", pin_file=_config_pin_rel(path), + existed=existed) + return {"tool": "config_pin", "action": "clear", + "pin_file": _config_pin_rel(path), "existed": existed} + + +def _config_pin_block_response(tool_name: str) -> dict: + """Structured envelope returned in place of the tool's normal output + when a config-drift block is active.""" + drift = _config_pin_drift() + return {"tool": tool_name, + **_error("CONFIG_PIN_DRIFT_BLOCKED", + f"config drift detected; refusing tool call. " + f"drift in: {drift['drift']}", + hint="Inspect with `config_pin(action='show')`. " + "After confirming the change is intentional, " + "run `config_pin(action='accept_drift')` to " + "re-pin, OR clear CROSSCHECK_REJECT_CONFIG_DRIFT " + "and config_pinning.reject_drift to disable " + "the gate.", + transient=False), + "drift": drift["drift"], + "pin_file": drift["pin_file"]} + + def _latest_transcript_for_session(session_id: str) -> str | None: """Find the most recent transcript JSON whose `session.session_id` matches. Best-effort, returns None on failure.""" @@ -8985,6 +9232,7 @@ def _latest_transcript_for_session(session_id: str) -> str | None: "verify": tool_verify, "recall": tool_recall, "session_memory": tool_session_memory, + "config_pin": tool_config_pin, "orchestrate": tool_orchestrate, "audit": tool_audit, "create": tool_create, @@ -9281,6 +9529,15 @@ def handle(req: dict) -> dict | None: err = _validate_input(name, tool["inputSchema"], args) if err: return rpc_error(id_, -32602, err) + # Lazy one-shot config-pin emission (visibility — never blocks here). + _config_pin_startup_check() + # Drift gate: when reject-drift is enabled and drift is present, + # refuse every tool call EXCEPT `config_pin` itself — operators + # need a way to inspect and accept the drift. + if name != "config_pin" and _config_pin_should_block(): + blocked = _config_pin_block_response(name) + return rpc_result(id_, {"content": [{"type": "text", + "text": json.dumps(blocked, indent=2)}]}) # MCP clients may pass a progressToken under _meta to opt in to # `notifications/progress` updates. Bind it to the current thread so # _emit_progress() can stream live timing/cost as the tool runs.