diff --git a/CHANGELOG.md b/CHANGELOG.md index a0de0f6..cab014b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,45 @@ versions per [PEP 440](https://peps.python.org/pep-0440/) / _Post-1.0.0 work lands here; legis versions independently from the Weft 1.0 launch on._ +## [1.3.0] — 2026-06-26 + +Suite-standard dot-dir hygiene, plus the legis-resident halves of six cross-repo +Weft seam conformance oracles. + +### Added + +- **Nested `.weft/legis/.gitignore` shipped at install (suite standard, + filigree-4ed8152630).** `legis install` (and `legis install --gitignore`) now + ships a nested `.weft/legis/.gitignore` alongside the existing project-root + `.weft/legis/` rule, so legis's machine-written runtime state stays out of + every commit even in a consuming repo that drops the root rule — closing + legis's part of the cross-suite "every tool ships a complete nested + `.gitignore` for its own dot-dir, with a durable-vs-ephemeral header" + standard. Unlike filigree's durable `filigree.db`, legis is the sole writer of + this subtree and commits **nothing** durable: the ignore enumerates the + governance/posture/pulls/checks/binding SQLite DBs (`legis-*.db`) and their + WAL/SHM/journal sidecars, names the operator-elevation secrets + (`operator.age`, `operator_session.json`) so a reviewer sees by name what is + never committed, and covers the atomic-write staging temps (`*.tmp`, + `.operator.age.*`) that `mkstemp` orphans in the dir on a failed write. The writer is marker-guarded and append-not-clobber (a + user-authored `.weft/legis/.gitignore` is preserved). `legis doctor` gains an + `install.dir_gitignore` check that flags an existing dot-dir missing the + nested ignore and ships it on `--repair` (an absent dot-dir is OK — created + lazily, nothing to protect yet). + +### Testing — cross-repo Weft seam conformance oracles + +- **Legis-resident halves of six cross-repo Weft seams**, each driving legis's + REAL code over a byte-identical golden with a Layer-1 byte-pin (fail-closed in + the default suite) plus a skip-clean Layer-2 source recheck: SEI + (loomweave→legis), git-renames (legis→loomweave), signoff-binding + (legis→filigree), loomweave-HMAC-wire (legis→loomweave, live-gated), the + warpline preflight read (legis consumer), and the per-SEI `attestation_get` + read (legis producer). Two outstanding peer obligations are recorded rather + than papered over — warpline ships no flat HTTP producer for the preflight + shape, and warpline's `LegisClient.governance_for_sei` is unwired — with the + Layer-2 rechecks armed to fire when each peer lands its half. + ## [1.2.0] — 2026-06-25 Warpline federation interfaces: an advisory preflight consumer and a diff --git a/pyproject.toml b/pyproject.toml index de807cf..2d127b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "legis" -version = "1.2.0" +version = "1.3.0" description = "Legis — the git/CI + governance layer of the Weft suite" readme = "README.md" license = "MIT" diff --git a/src/legis/cli.py b/src/legis/cli.py index cebbc82..3c7a642 100644 --- a/src/legis/cli.py +++ b/src/legis/cli.py @@ -611,6 +611,7 @@ def _run_install(args) -> int: OperatorKeyCustodyError, choose_install_backend, ensure_gitignore, + ensure_legis_dir_gitignore, inject_instructions, install_claude_code_hooks, install_codex_skills, @@ -658,6 +659,11 @@ def _do_posture() -> tuple[bool, str]: (install_all or args.codex_skills, "Codex skill", lambda: install_codex_skills(project_root)), (install_all or args.hooks, "Claude Code hook", lambda: install_claude_code_hooks(project_root)), (install_all or args.gitignore, ".gitignore", lambda: ensure_gitignore(project_root)), + ( + install_all or args.gitignore, + ".weft/legis/.gitignore", + lambda: ensure_legis_dir_gitignore(project_root), + ), (install_all or args.mcp, ".mcp.json", lambda: register_mcp_json(project_root, args.agent_id)), (install_all or args.posture, "posture ledger", _do_posture), ] diff --git a/src/legis/doctor.py b/src/legis/doctor.py index e3b16cc..b54a77b 100644 --- a/src/legis/doctor.py +++ b/src/legis/doctor.py @@ -279,6 +279,43 @@ def check_gitignore(root: Path, *, repair: bool) -> DoctorCheck: ) +def _nested_gitignore_shipped(legis_dir: Path) -> bool: + """True iff ``.weft/legis/.gitignore`` carries the legis-managed marker.""" + nested = legis_dir / ".gitignore" + try: + return nested.is_file() and _install.LEGIS_DIR_GITIGNORE_MARKER in nested.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + return False + + +def check_dir_gitignore(root: Path, *, repair: bool) -> DoctorCheck: + """Check that legis's dot-dir ships its nested ``.gitignore`` (suite standard). + + An absent ``.weft/legis/`` is OK — it is created lazily and holds nothing to + protect yet (mirrors :func:`check_store_dir`). Once the dir exists, the + nested ignore must be present so legis runtime state never commits even if a + consuming repo drops the root ``.weft/`` rule (filigree-4ed8152630). + """ + cid = "install.dir_gitignore" + legis_dir = _store_dir_for(root) + if _nested_gitignore_shipped(legis_dir): + return DoctorCheck(cid, "ok", repairable=True) + if repair: + # Ship it (creating .weft/legis/ if needed) so the protection exists + # before any DB is opened — robust against the dir being created lazily + # by a later check (e.g. an audit-chain DB open) in the same repair pass. + ok, msg = _install.ensure_legis_dir_gitignore(root) + if ok and _nested_gitignore_shipped(legis_dir): + return DoctorCheck(cid, "ok", fixed=True, repairable=True) + return DoctorCheck(cid, "error", message=msg, repairable=True) + if not legis_dir.is_dir(): + # Created lazily; nothing to protect yet (mirrors check_store_dir). + return DoctorCheck(cid, "ok", repairable=True) + return DoctorCheck( + cid, "error", message=".weft/legis/.gitignore missing (run: legis install)", repairable=True + ) + + # --------------------------------------------------------------------------- # Task 7: config & store checks # --------------------------------------------------------------------------- @@ -937,6 +974,7 @@ def collect_checks(root: Path, *, repair: bool) -> list[DoctorCheck]: checks.append(check_skill_pack(root, ".agents", repair=repair)) checks.append(check_hook(root, repair=repair)) checks.append(check_gitignore(root, repair=repair)) + checks.append(check_dir_gitignore(root, repair=repair)) checks.append(check_mcp_json(root, repair=repair)) checks.append(check_filigree_binding_scope(root)) checks.append(check_weft_toml(root)) diff --git a/src/legis/install.py b/src/legis/install.py index 31d8377..6aba42c 100644 --- a/src/legis/install.py +++ b/src/legis/install.py @@ -944,6 +944,100 @@ def ensure_gitignore(project_root: Path) -> tuple[bool, str]: return True, "Created .gitignore with legis config rules" +# --------------------------------------------------------------------------- +# Nested .weft/legis/.gitignore +# --------------------------------------------------------------------------- + +# Idempotency marker — the header line of the shipped nested ignore. A file +# already carrying it is left untouched; a user-authored .gitignore is appended +# to (never clobbered). +LEGIS_DIR_GITIGNORE_MARKER = "managed-by: legis (machine-written runtime state)" + +# The shipped nested ignore for legis's own dot-dir. The project-root +# `ensure_gitignore` already adds a `.weft/legis/` rule; this nested file is the +# suite-standard safety net (filigree-4ed8152630): it keeps legis runtime state +# out of every commit even if a consuming repo drops that root rule, and names +# the runtime files so a reviewer sees by name what is never committed. +# +# Unlike filigree (whose filigree.db is durable, committed payload), legis is the +# sole writer of this subtree and commits NOTHING durable here — every file is +# regenerated/local or secret-shaped. Enumerate-with-glob (not a bare `*`) to +# match the sibling tools, leave the nested `.gitignore` itself tracked, and let +# a hypothetical future durable file stay committable (the safe direction). +WEFT_LEGIS_GITIGNORE = f"""\ +# .weft/legis/.gitignore — {LEGIS_DIR_GITIGNORE_MARKER} +# +# legis is the sole writer of this subtree and commits NOTHING durable here. The +# project-root .gitignore ignores .weft/ by default; this nested file keeps +# legis's runtime state out of every commit even if a consuming repo drops that +# root rule (the suite standard, filigree-4ed8152630). +# +# Durable (committed when this dir is tracked): none. +# Ephemeral / secret (never committed): everything below. + +# Governance / posture / pulls / checks / binding SQLite DBs (regenerated/local) +legis-*.db + +# SQLite write-ahead-log sidecars and rollback journals +*.db-wal +*.db-shm +*.db-journal + +# Operator-elevation surfaces: encrypted operator key + ephemeral elevation +# session metadata (both secret-shaped — never commit) +operator.age +operator_session.json + +# Atomic-write staging temps — mkstemp/temp siblings orphaned in this dir on a +# failed write (legis._atomic_write_text, the posture session/head-anchor +# writers, and the operator-key writer all stage here; the operator-key temp is +# secret-shaped) +*.tmp +.operator.age.* +""" + + +def _ensure_nested_gitignore(target_dir: Path, body: str, label: str) -> tuple[bool, str]: + """Idempotently ship a nested ``.gitignore`` (*body*) into *target_dir*. + + A file already carrying :data:`LEGIS_DIR_GITIGNORE_MARKER` is left untouched; + a user-authored ``.gitignore`` is appended to rather than clobbered. + """ + try: + nested = project_path(target_dir, ".gitignore") + except UnsafeInstallPathError as exc: + return False, str(exc) + + if nested.exists(): + content = nested.read_text(encoding="utf-8") + if LEGIS_DIR_GITIGNORE_MARKER in content: + return True, f"{label} already present" + if not content.endswith("\n"): + content += "\n" + content += "\n" + body + _atomic_write_text(nested, content) + return True, f"Added legis ephemeral rules to {label}" + _atomic_write_text(nested, body) + return True, f"Created {label}" + + +def ensure_legis_dir_gitignore(project_root: Path) -> tuple[bool, str]: + """Ship ``.weft/legis/.gitignore`` so legis runtime state never commits. + + The project-root ``.gitignore`` ignores ``.weft/legis/`` by default (see + :func:`ensure_gitignore`); this nested file is the suite-standard safety net + (filigree-4ed8152630) for projects that drop that root rule. It excludes the + governance/posture/pulls/checks/binding SQLite DBs, their WAL/SHM/journal + sidecars, and the operator-elevation secrets (``operator.age`` / + ``operator_session.json``) — legis commits nothing durable here. + """ + try: + legis_dir = ensure_project_dir(project_root, ".weft", "legis") + except UnsafeInstallPathError as exc: + return False, str(exc) + return _ensure_nested_gitignore(legis_dir, WEFT_LEGIS_GITIGNORE, ".weft/legis/.gitignore") + + # --------------------------------------------------------------------------- # .mcp.json (agent MCP server registration) # --------------------------------------------------------------------------- diff --git a/tests/conformance/fixtures/PROVENANCE.md b/tests/conformance/fixtures/PROVENANCE.md index 018a891..ec69037 100644 --- a/tests/conformance/fixtures/PROVENANCE.md +++ b/tests/conformance/fixtures/PROVENANCE.md @@ -8,3 +8,25 @@ fixture from: It is vendored so Legis CI can run the SEI consumer oracle without requiring the Loomweave checkout. `tests/conformance/test_sei_oracle.py` compares this copy against the sibling authority fixture when the checkout is present. + +# legis -> warpline per-SEI attestation golden + +`legis-warpline-attestation-get.golden.json` is NOT vendored from a sibling — it +is FROZEN from legis's OWN real producer wire. legis is the PRODUCER of per-SEI +governance attestations that warpline reads (an `attestation_get`-style read). + +The producer wire is the legis MCP tool `attestation_get` +(`mcp.py::_tool_attestation_get` -> `legis.service.governance.read_sei_attestations`); +there is no HTTP attestation route. The golden was generated by driving that real +wire: a genuine signed `signoff_cleared` (`SignoffGate.request`/`sign_off`) AND a +genuine signed `operator_override` (`ProtectedGate.operator_override`), both keyed +on one opaque SEI, written into a real `AuditStore`, then read back via +`call_tool("attestation_get")`. Determinism comes from a `FixedClock` +(`recorded_at`) and a fresh append-only store (seqs 1/2/3); the response EXCLUDES +the HMAC signature, so the golden is HMAC-key-independent. + +It is keyed on the SEI (rename-stable) — the property warpline relies on. +`tests/conformance/test_warpline_attestation_oracle.py` rechecks the live +serializer + MCP wire against it (producer-source recheck) and byte-pins it +(Layer-1, default suite). To re-freeze after an intended producer change, re-run +that real wire and update `GOLDEN_BLOB_SHA` in the oracle. diff --git a/tests/conformance/fixtures/legis-warpline-attestation-get.golden.json b/tests/conformance/fixtures/legis-warpline-attestation-get.golden.json new file mode 100644 index 0000000..ad894cc --- /dev/null +++ b/tests/conformance/fixtures/legis-warpline-attestation-get.golden.json @@ -0,0 +1,19 @@ +{ + "attestations": [ + { + "content_hash": "blake3:signoff-cleared", + "kind": "signoff_cleared", + "recorded_at": "2026-06-02T12:00:00+00:00", + "seq": 2, + "signoff_seq": 1 + }, + { + "content_hash": "blake3:operator-override", + "kind": "operator_override", + "recorded_at": "2026-06-02T12:00:00+00:00", + "seq": 3 + } + ], + "sei": "loomweave:eid:0000000000000000000000000000aaaa", + "status": "checked" +} diff --git a/tests/conformance/test_loomweave_hmac_wire_conformance.py b/tests/conformance/test_loomweave_hmac_wire_conformance.py new file mode 100644 index 0000000..52faff5 --- /dev/null +++ b/tests/conformance/test_loomweave_hmac_wire_conformance.py @@ -0,0 +1,315 @@ +"""Live cross-impl Weft HMAC wire conformance: legis signer <-> loomweave verifier. + +This is the genuinely uncovered half of the Loomweave SEI read-API transport seam. + +What is ALREADY covered (and therefore NOT re-pinned here): + + * The SEI *semantics* (resolve / sei / lineage / capability response shapes, + carry-vs-orphan behaviour) are frozen by ``tests/conformance/test_sei_oracle.py`` + against an in-process ``FakeLoomweave`` and the vendored §8 fixture. Those drive + no HTTP and no HMAC. + * legis's OWN HMAC formula is already pinned by + ``tests/identity/test_loomweave_client.py::test_sign_loomweave_request_matches_loomweave_hmac_contract`` + and ``tests/test_weft_signing.py`` — both recompute the canonical message in + Python and assert ``sign_loomweave_request`` reproduces it. That is a legis-side + *drift detector*; it proves legis is internally consistent, NOT that legis agrees + with loomweave's real verifier. + +What is NOT covered anywhere, and is the seam this file freezes: that legis's REAL +``sign_loomweave_request`` produces a signature loomweave's REAL Rust verifier +(``component_hmac_hex`` / ``canonical_hmac_message`` in +``crates/loomweave-cli/src/http_read/auth.rs``) ACCEPTS, byte-for-byte. The verifier +helpers are ``pub(crate)`` — they cannot be called in-process from Python, and +re-implementing the format string in Python (a third copy) would be exactly the +tautology the conformance program forbids. So the only non-circular proof is to run +the real loomweave binary and let its verifier adjudicate a real legis signature. + +Why a single accept/reject pair proves the WHOLE canonical message, not just the +HMAC primitive: the verifier hashes ``METHOD\npath?query\nsha256_hex(body)\nts\nnonce`` +and HMACs it under the shared secret. If legis diverged on the method, the +path-and-query projection, the body canonicalization (compact, sorted, ascii-escaped +``weft_body_bytes``), the lowercase-hex SHA-256 of the body, the timestamp rendering, +the nonce, OR the HMAC itself, the reconstructed message would differ and the compare +would fail closed -> 401. A 200 (auth passed) is therefore byte-exact agreement across +the entire formula; a tampered signature -> 401 is the negative control. + +The positive assertion is on the AUTH outcome (request admitted past the HMAC guard), +NOT on resolve semantics: an unknown locator legitimately returns ``{"alive": false}`` +with HTTP 200, and re-pinning that body would re-enter the SEI oracle's territory. + +Gating: this spins up a real ``loomweave serve`` and is opt-in. It runs only when +``LEGIS_LIVE_LOOMWEAVE=1`` AND a loomweave binary is discoverable; otherwise it +skips clean (no marker — the suite registers none, and ``filterwarnings=["error"]`` +would turn an unknown marker into a collection error). Operators enable it with +``LEGIS_LIVE_LOOMWEAVE=1`` (optionally ``LEGIS_LOOMWEAVE_BIN=/path/to/loomweave``). +""" + +from __future__ import annotations + +import json +import os +import shutil +import socket +import subprocess +import time +import urllib.error +import urllib.request +import uuid +from collections.abc import Iterator +from pathlib import Path + +import pytest + +from legis.identity.loomweave_client import ( + HttpLoomweaveIdentity, + sign_loomweave_request, +) + +# --------------------------------------------------------------------------- +# Gating: opt-in env + a discoverable loomweave binary. +# --------------------------------------------------------------------------- + +_SECRET = "weft-hmac-wire-conformance-secret" +_IDENTITY_ENV = "WEFT_LIVE_WIRE_IDENTITY_SECRET" +_READY_TIMEOUT_S = 20.0 + + +def _discover_loomweave_bin() -> str | None: + explicit = os.environ.get("LEGIS_LOOMWEAVE_BIN") + if explicit and Path(explicit).is_file() and os.access(explicit, os.X_OK): + return explicit + candidates = [ + Path(__file__).resolve().parents[3] / "loomweave" / "target" / "release" / "loomweave", + Path(__file__).resolve().parents[3] / "loomweave" / "target" / "debug" / "loomweave", + ] + for cand in candidates: + if cand.is_file() and os.access(cand, os.X_OK): + return str(cand) + found = shutil.which("loomweave") + return found + + +_LIVE_ENABLED = os.environ.get("LEGIS_LIVE_LOOMWEAVE") == "1" +_LOOMWEAVE_BIN = _discover_loomweave_bin() + +# Gates only the two LIVE tests (those that spin up the real binary). The Layer-2 +# source recheck below has no live/binary dependency and runs in the default suite +# (it carries its own skip-clean when the loomweave source is not present). +_requires_live_loomweave = pytest.mark.skipif( + not (_LIVE_ENABLED and _LOOMWEAVE_BIN), + reason=( + "live loomweave HMAC wire conformance is opt-in: set LEGIS_LIVE_LOOMWEAVE=1 " + "and provide a loomweave binary (LEGIS_LOOMWEAVE_BIN or a built " + "../loomweave/target/{release,debug}/loomweave)" + ), +) + + +def _free_loopback_port() -> int: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(("127.0.0.1", 0)) + return int(sock.getsockname()[1]) + finally: + sock.close() + + +def _http_status(url: str, *, method: str, body: bytes | None, + headers: dict[str, str]) -> int: + """Send a raw request and return the HTTP status (no legis decode layer).""" + req = urllib.request.Request(url, data=body, method=method) + if body is not None: + req.add_header("Content-Type", "application/json") + for name, value in headers.items(): + req.add_header(name, value) + try: + with urllib.request.urlopen(req, timeout=5.0) as resp: # noqa: S310 (loopback test server) + resp.read() + return int(resp.status) + except urllib.error.HTTPError as exc: + exc.read() + exc.close() + return int(exc.code) + + +@pytest.fixture +def live_loomweave(tmp_path: Path) -> Iterator[str]: + """Stand up a real ``loomweave serve`` with HMAC identity on loopback. + + Yields the base URL. Tears down by terminating ONLY this fixture's own child + process (the host runs other sessions' ``loomweave serve`` instances — a + pattern-kill would destroy their work) and closing every owned resource so + ``filterwarnings=["error"]`` does not trip on a leaked pipe. + """ + assert _LOOMWEAVE_BIN is not None # narrowed by the module skipif + project = tmp_path / "proj" + project.mkdir() + + # `loomweave install` creates .weft/loomweave/loomweave.db (the serve target). + subprocess.run( + [_LOOMWEAVE_BIN, "install", "--path", str(project)], + check=True, capture_output=True, text=True, + ) + + port = _free_loopback_port() + bind = f"127.0.0.1:{port}" + base_url = f"http://{bind}" + # The HMAC secret is read from the env var NAMED by `identity_token_env` + # (loomweave http_read.rs resolution), exactly the serve.rs HMAC recipe. + (project / "loomweave.yaml").write_text( + "version: 1\n" + "serve:\n" + " http:\n" + " enabled: true\n" + f' bind: "{bind}"\n' + f' identity_token_env: "{_IDENTITY_ENV}"\n', + encoding="utf-8", + ) + + env = {**os.environ, _IDENTITY_ENV: _SECRET} + # `serve` runs an MCP stdio loop; a closed stdin (DEVNULL) makes it exit and + # tears the HTTP thread down with it, so we hold an open stdin pipe. + proc = subprocess.Popen( + [_LOOMWEAVE_BIN, "serve", "--path", str(project)], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=env, + ) + try: + # Readiness: poll the UNPROTECTED capabilities route until it answers. + deadline = time.monotonic() + _READY_TIMEOUT_S + ready = False + while time.monotonic() < deadline: + if proc.poll() is not None: + raise RuntimeError( + f"loomweave serve exited early with code {proc.returncode}" + ) + try: + status = _http_status( + f"{base_url}/api/v1/_capabilities", + method="GET", body=None, headers={}, + ) + except urllib.error.URLError: + time.sleep(0.1) + continue + if status == 200: + ready = True + break + time.sleep(0.1) + if not ready: + raise RuntimeError("loomweave HTTP read API did not become ready in time") + yield base_url + finally: + if proc.stdin is not None: + proc.stdin.close() + proc.terminate() + try: + proc.wait(timeout=10.0) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=10.0) + + +@_requires_live_loomweave +def test_real_legis_signature_is_accepted_by_real_loomweave_verifier( + live_loomweave: str, +) -> None: + """POSITIVE: legis's REAL client, signing with the REAL ``sign_loomweave_request`` + under the shared secret, is admitted past loomweave's REAL HMAC guard on a + PROTECTED route (``POST /api/v1/identity/resolve``). + + legis's ``_urllib_fetch`` raises ``LoomweaveError`` on any non-2xx (a 401 from a + rejected signature included), so a clean dict return is proof the request passed + the HMAC verifier. We assert on auth admission, not resolve semantics: an unknown + locator's ``{"alive": false}`` is a perfectly valid admitted response. + """ + client = HttpLoomweaveIdentity(live_loomweave, hmac_key=_SECRET) + # An arbitrary locator that does not exist in the freshly-installed (un-analyzed) + # project: the route still runs auth first, then answers alive=false at 200. + resolved = client.resolve_locator("python:function:nonexistent.module.fn") + # The load-bearing fact is that no LoomweaveError was raised: ``_urllib_fetch`` + # raises on any non-2xx (a 401 from a rejected signature included), so a clean + # dict return is proof the request was admitted past the HMAC verifier. We do NOT + # assert on the resolve verdict itself (alive/sei/...) — the SEI oracle owns those + # semantics, and coupling the auth proof to them would re-pin covered behaviour. + assert isinstance(resolved, dict) + + +@_requires_live_loomweave +def test_tampered_legis_signature_is_rejected_by_real_loomweave_verifier( + live_loomweave: str, +) -> None: + """NEGATIVE control: take a REAL legis signature and flip one hex character of the + HMAC; the REAL verifier must reject it with 401 UNAUTHENTICATED. + + This proves the positive result is not vacuous (the route really is guarded and + really checks the signature), and that the agreement is byte-exact: a single-bit + perturbation of the otherwise-correct signature fails closed. + """ + url = f"{live_loomweave}/api/v1/identity/resolve" + body = {"locator": "python:function:nonexistent.module.fn"} + body_bytes = json.dumps(body, sort_keys=True, separators=(",", ":")).encode("utf-8") + # A fresh timestamp + nonce so the rejection is attributable to the bad signature, + # not the freshness window or replay cache. + headers = sign_loomweave_request( + _SECRET.encode("utf-8"), + "POST", + url, + body, + timestamp=int(time.time()), + nonce=uuid.uuid4().hex, + ) + component = headers["X-Weft-Component"] + last = component[-1] + headers["X-Weft-Component"] = component[:-1] + ("0" if last != "0" else "1") + + status = _http_status(url, method="POST", body=body_bytes, headers=headers) + assert status == 401, f"tampered signature should be rejected with 401, got {status}" + + +def test_loomweave_canonical_hmac_template_is_unchanged() -> None: + """Layer-2 (skip-clean) source recheck: tie the agreement to loomweave's REAL + verifier source, not just to a running binary. + + Reads loomweave's ``http_read/auth.rs`` and asserts ``canonical_hmac_message`` + still builds ``METHOD\\npath?query\\nsha256_hex(body)\\nts\\nnonce`` in that order. + If loomweave changes the canonical message, this reds with a clear pointer even + when the live binary is not being run. Skips clean when the source is not present. + """ + auth_rs = ( + Path(__file__).resolve().parents[3] + / "loomweave" / "crates" / "loomweave-cli" / "src" / "http_read" / "auth.rs" + ) + if not auth_rs.is_file(): + pytest.skip("loomweave auth.rs source not present; skip the formula recheck") + src = auth_rs.read_text(encoding="utf-8") + assert "fn canonical_hmac_message" in src, ( + "loomweave canonical_hmac_message verifier helper was renamed or removed" + ) + # Anchor on the format! literal itself, then read its argument list (everything + # up to the closing paren of the format! call). This scopes the order check to + # the message construction and excludes the function signature's param order. + fmt_literal = '"{}\\n{}\\n{}\\n{}\\n{}"' + assert fmt_literal in src, ( + "loomweave canonical_hmac_message format string changed; the legis signer's " + "5-field message layout may no longer agree with the verifier" + ) + # Scope to the canonical_hmac_message function body: from the format! literal up + # to the next top-level `fn ` (the inner expression has nested parens, so slicing + # to "the first )" would truncate `Sha256::digest(body)`). + after_literal = src.split(fmt_literal, 1)[1] + fmt_args = after_literal.split("\nfn ", 1)[0].split("\npub(crate) fn ", 1)[0] + # The body field is the lowercase hex of the SHA-256 of the raw body bytes. + assert "hex_lower(&Sha256::digest(body))" in fmt_args, ( + "loomweave no longer hashes the body as lowercase-hex SHA-256 inside the " + "canonical message; the legis signer's body_hash step may have diverged" + ) + # Argument order inside the format! call: method, path_and_query, + # , timestamp, nonce. + order = ["method", "path_and_query", "Sha256::digest(body)", "timestamp", "nonce"] + positions = [fmt_args.index(tok) for tok in order] + assert positions == sorted(positions), ( + "loomweave canonical_hmac_message argument order changed; the legis signer's " + f"field order may no longer agree with the verifier (positions={positions})" + ) diff --git a/tests/conformance/test_sei_oracle_byte_pin.py b/tests/conformance/test_sei_oracle_byte_pin.py new file mode 100644 index 0000000..fbb38c9 --- /dev/null +++ b/tests/conformance/test_sei_oracle_byte_pin.py @@ -0,0 +1,37 @@ +"""Weft SEI §8 conformance oracle — Layer-1 fail-closed byte-pin. + +The vendored ``sei-conformance-oracle.json`` is byte-identical to Loomweave's +authoritative fixture. ``test_sei_oracle.py`` already carries a Layer-2 source +recheck (``test_vendored_oracle_matches_loomweave_source``), but that recheck +*skips clean* when no Loomweave checkout is present — so on its own it cannot +catch a corrupted or edited vendored fixture in the default CI suite. + +This module is the complementary Layer-1 pin: it recomputes the git blob sha1 +of the vendored fixture in-process and asserts it against a hard-coded constant. +It is UNMARKED, takes no environment dependency, and therefore runs in the +DEFAULT suite and fails CLOSED on any byte drift of the vendored fixture. +""" +from __future__ import annotations + +import hashlib +from pathlib import Path + +# git blob sha1 of legis's vendored tests/conformance/fixtures/sei-conformance-oracle.json. +# Byte-identical to Loomweave's authoritative docs/federation/fixtures fixture. +VENDORED_BLOB_SHA = "0ea577025d94c028a0f682b7d29765079455718c" + +ORACLE_PATH = Path(__file__).parent / "fixtures" / "sei-conformance-oracle.json" + + +def _git_blob_sha1(data: bytes) -> str: + # git's blob object hash: sha1 over "blob \0" + content. + return hashlib.sha1(b"blob %d\0" % len(data) + data).hexdigest() + + +def test_vendored_oracle_byte_pin(): + data = ORACLE_PATH.read_bytes() + assert _git_blob_sha1(data) == VENDORED_BLOB_SHA, ( + "Vendored SEI conformance oracle has drifted from its pinned bytes; " + "re-vendor from Loomweave's docs/federation/fixtures/sei-conformance-oracle.json " + "and update VENDORED_BLOB_SHA only after confirming the change is intended." + ) diff --git a/tests/conformance/test_warpline_attestation_oracle.py b/tests/conformance/test_warpline_attestation_oracle.py new file mode 100644 index 0000000..c450152 --- /dev/null +++ b/tests/conformance/test_warpline_attestation_oracle.py @@ -0,0 +1,261 @@ +"""Weft seam conformance oracle — legis (PRODUCER) -> warpline (CONSUMER). + +SEAM: legis produces per-SEI governance ATTESTATIONS that warpline reads to +decide whether a verification can be skipped (an ``attestation_get``-style read). +The PRODUCER wire is the legis MCP tool ``attestation_get`` (mcp.py +``_tool_attestation_get`` -> ``legis.service.governance.read_sei_attestations``). +There is NO HTTP attestation route (api/app.py serves none); the MCP tool is the +served surface. + +This module freezes a shared GOLDEN of legis's per-SEI attestation response and +rechecks the REAL serializer + the REAL MCP wire against it, so a legis-side +rename of an attestation field (``kind``/``content_hash``/``recorded_at``/ +``seq``/``signoff_seq``/``status``) — or a change to the ``checked``/ +``unavailable`` discriminant — reds here. + +RENAME-STABILITY (the property warpline relies on): the attestation is keyed on +the opaque SEI (``entity_key.value`` inside ``signing_fields["entity"]``), NOT on +a locator. After a rename/move the SEI is unchanged, so the attestation still +resolves under the same key — that is exactly why warpline keys its governance +read on ``sei`` (contract.md "Keying"; federation.LegisClient.governance_for_sei). + +GOLDEN PROVENANCE (non-circular): the golden was frozen ONCE from the real wire +(SignoffGate + ProtectedGate writing genuine signed records into a real +AuditStore, then call_tool("attestation_get")). The rechecks below load the +golden from the FILE and compare LIVE serializer/wire output to it — they never +regenerate it. Frozen bytes do not move when the serializer changes, so a field +rename in the producer is caught. See fixtures/PROVENANCE.md. + +Layered freshness defence (mirrors test_sei_oracle*.py): + * Layer-1 byte-pin (``test_attestation_golden_byte_pin``): UNMARKED, default + suite, recomputes the git blob sha1 of the golden file and fails CLOSED on + any byte drift — even when the producer source is unchanged. + * Producer-source recheck (``test_*_reproduces_golden``): drives the real + legis serializer/wire to reproduce the golden's shape AND values. + * Layer-2 consumer recheck (``test_warpline_consumer_field_names_*``): + skip-clean against warpline's source/contract — see the HONEST-GAP note. +""" +from __future__ import annotations + +import hashlib +import json +import os +from functools import lru_cache +from pathlib import Path + +import pytest + +from legis.clock import FixedClock +from legis.enforcement.engine import EnforcementEngine +from legis.enforcement.protected import ProtectedGate, TrailVerifier +from legis.enforcement.signoff import SignoffGate +from legis.enforcement.verdict import JudgeOpinion, Verdict +from legis.identity.entity_key import EntityKey +from legis.mcp import McpRuntime, call_tool +from legis.service.governance import read_sei_attestations +from legis.store.audit_store import AuditStore + +GOLDEN_PATH = ( + Path(__file__).parent / "fixtures" / "legis-warpline-attestation-get.golden.json" +) + +# git blob sha1 of the vendored golden. Layer-1 fail-closed pin (UNMARKED). +GOLDEN_BLOB_SHA = "ad894cc0d93cb9e8f5631bfccacbdeacbbae4994" + +# Fixed inputs the golden was frozen with — reused by the source recheck so the +# LIVE wire reproduces the SAME bytes. A FixedClock makes recorded_at stable; +# a fresh append-only store makes the seqs (1 PENDING, 2 SIGNED_OFF, 3 OVERRIDE) +# deterministic; the golden EXCLUDES the HMAC signature, so it is key-independent. +_CLOCK_ISO = "2026-06-02T12:00:00+00:00" +_KEY = b"weft-seam-conformance-key" +_POLICY = "protected.attestation" +# Opaque SEI (rename-stable key). Both a signoff_cleared and an operator_override +# attestation are keyed on this one SEI so a single response pins both kinds. +_SEI = "loomweave:eid:0000000000000000000000000000aaaa" + + +@lru_cache(maxsize=1) +def _load_golden() -> dict: + return json.loads(GOLDEN_PATH.read_text(encoding="utf-8")) + + +def _git_blob_sha1(data: bytes) -> str: + return hashlib.sha1(b"blob %d\0" % len(data) + data).hexdigest() + + +class _AdvisoryJudge: + """Protected-cell judge is advisory; operator_override bypasses it entirely.""" + + def evaluate(self, record): # noqa: ANN001 + return JudgeOpinion(Verdict.BLOCKED, "judge@1", "advisory only") + + +def _build_attested_store(tmp_path) -> AuditStore: + """Write a genuine signoff_cleared + operator_override attestation, both keyed + on ``_SEI``, into a real AuditStore through the real legis write path. + + This is the SAME path the golden was frozen from. It exercises: + * SignoffGate.request/sign_off -> a real signed SIGNED_OFF record whose + PENDING join is integrity-bound (read_sei_attestations FORGE-B). + * ProtectedGate.operator_override -> a real signed OVERRIDDEN_BY_OPERATOR + record (read_sei_attestations FORGE-A). + """ + store = AuditStore(f"sqlite:///{tmp_path / 'gov.db'}") + clock = FixedClock(_CLOCK_ISO) + + signoff = SignoffGate(store, clock, signer=True, key=_KEY) + req = signoff.request( + policy=_POLICY, + entity_key=EntityKey(value=_SEI, identity_stable=True), + rationale="review", + agent_id="agent-1", + extensions={"loomweave": {"content_hash": "blake3:signoff-cleared"}}, + ) + signoff.sign_off(request_seq=req.seq, operator_id="op-1", rationale="ok") + + protected = ProtectedGate( + store, + clock, + judge=_AdvisoryJudge(), + key=_KEY, + protected_policies=frozenset({_POLICY}), + ) + protected.operator_override( + policy=_POLICY, + entity_key=EntityKey(value=_SEI, identity_stable=True), + rationale="release exception approved by security lead", + operator_id="op-sec-lead", + file_fingerprint="sha256:abc", + ast_path="Module/Call[eval]", + extensions={"loomweave": {"content_hash": "blake3:operator-override"}}, + ) + return store + + +def _wired_runtime(store: AuditStore) -> McpRuntime: + """A runtime with BOTH a protected gate and a trail verifier wired, so the + attestation_get fail-closed pre-gate passes and the verified trail is read.""" + engine = EnforcementEngine(store, FixedClock(_CLOCK_ISO)) + gate = ProtectedGate( + store, + FixedClock(_CLOCK_ISO), + judge=_AdvisoryJudge(), + key=_KEY, + protected_policies=frozenset({_POLICY}), + ) + return McpRuntime( + agent_id="agent-launch", + initialized=True, + engine=engine, + protected_gate=gate, + trail_verifier=TrailVerifier(_KEY, frozenset({_POLICY})), + ) + + +# --- Layer-1: byte-pin (UNMARKED, default suite, fails CLOSED on byte drift) --- +def test_attestation_golden_byte_pin(): + data = GOLDEN_PATH.read_bytes() + assert _git_blob_sha1(data) == GOLDEN_BLOB_SHA, ( + "legis<->warpline attestation golden drifted from its pinned bytes; " + "re-freeze it from the real attestation_get wire and update " + "GOLDEN_BLOB_SHA only after confirming the change is intended (and that " + "warpline's consumer still reads the new shape)." + ) + + +# --- Producer-source recheck (non-circular): real serializer reproduces golden - +def test_serializer_reproduces_golden(tmp_path): + # Field-name pin: drive the REAL read_sei_attestations over the REAL verified + # records. A rename of any attestation field (kind/content_hash/recorded_at/ + # seq/signoff_seq/status/sei) reds here against the frozen golden. + store = _build_attested_store(tmp_path) + live = read_sei_attestations(store.read_all(), _SEI) + assert live == _load_golden() + + +def test_mcp_wire_reproduces_golden(tmp_path): + # Wire pin: the FULL served surface warpline reads — call_tool -> the + # structuredContent dict. Equals the serializer dict by construction, and + # additionally exercises the fail-closed verified-trail pre-gate. + store = _build_attested_store(tmp_path) + runtime = _wired_runtime(store) + result = call_tool(runtime, "attestation_get", {"sei": _SEI}) + assert not result.get("isError") + assert result["structuredContent"] == _load_golden() + + +def test_both_attestation_kinds_are_pinned(): + # Guard against a one-kind golden silently leaving the other branch's `kind` + # value un-pinned: the frozen golden MUST carry both producer branches. + kinds = {a["kind"] for a in _load_golden()["attestations"]} + assert kinds == {"signoff_cleared", "operator_override"} + + +def test_attestation_golden_is_keyed_on_sei(): + # RENAME-STABILITY: the golden's `sei` is the opaque loomweave SEI, never a + # locator. This is the property warpline relies on (it keys its governance + # read on `sei`), so the seam survives a rename/move on the legis side. + golden = _load_golden() + assert golden["status"] == "checked" + assert golden["sei"] == _SEI + assert golden["sei"].startswith("loomweave:eid:") + + +def test_unavailable_discriminant_is_distinct_from_empty_checked(tmp_path): + # The consumer MUST distinguish 'unavailable' (trail not verifiable -> reverify) + # from a 'checked' empty list (honestly never attested). A wired-but-absent + # gate yields the unavailable discriminant the golden's `status` field anchors. + store = _build_attested_store(tmp_path) + engine = EnforcementEngine(store, FixedClock(_CLOCK_ISO)) + unwired = McpRuntime(agent_id="x", initialized=True, engine=engine) + sc = call_tool(unwired, "attestation_get", {"sei": _SEI})["structuredContent"] + assert sc["status"] == "unavailable" + assert sc["attestations"] == [] + assert sc["status"] != _load_golden()["status"] + + +# --- Layer-2: consumer recheck against warpline (HONEST-GAP, skip by default) --- +def _warpline_repo() -> Path | None: + # OPT-IN ONLY: resolve to a warpline checkout *exclusively* via WARPLINE_REPO. + # We deliberately do NOT auto-discover the sibling `/home/john/warpline`, + # because warpline's consumer half is mid-flight (the verification-freshness + # rework lives in `federation.py`); auto-running source-string assertions + # against an in-flight branch would couple the default legis suite to warpline + # churn. So the default suite SKIPS this honest-gap with a clear reason; an + # operator who wants the recheck points WARPLINE_REPO at a checkout explicitly. + env = os.environ.get("WARPLINE_REPO") + if not env: + return None + repo = Path(env) + return repo if (repo / "src" / "warpline").exists() else None + + +def test_warpline_consumer_field_names_honest_gap(): + # HONEST-GAP (consumer half not yet wired). warpline main defines the seam + # PROTOCOL — federation.LegisClient.governance_for_sei(sei) -> list[dict] — + # but the transport is NOT wired: _consult_legis reports the read "disabled" + # with a recruiting fix that NAMES "the legis MCP governance read". There are + # therefore NO attestation field-name literals (kind/content_hash/recorded_at/ + # seq/signoff_seq) in warpline to assert against yet, so a field-granularity + # Layer-2 pin is not feasible without fabricating one. + # + # SKIP BY DEFAULT (opt-in via WARPLINE_REPO): when a checkout is supplied we + # pin the one load-bearing property that IS present — the consumer seam is + # keyed on the SEI (rename-stable), the property the producer golden + # guarantees — and that its recruiting fix points back at THIS producer's MCP + # read. When warpline wires governance_for_sei over attestation_get and starts + # reading the `kind`/`content_hash` fields, replace this with a real + # field-name recheck against warpline's consumer source to close the seam + # two-sided. + repo = _warpline_repo() + if repo is None: + pytest.skip( + "warpline consumer recheck is opt-in (set WARPLINE_REPO); the consumer " + "half is unwired/in-flight, so this is an honest-gap, not a satisfied pin" + ) + federation = (repo / "src" / "warpline" / "federation.py").read_text(encoding="utf-8") + # The seam is keyed on the SEI (rename-stable) — the producer-side guarantee. + assert "def governance_for_sei(self, sei: str)" in federation + # The recruiting fix points the consumer at the legis MCP governance read — + # i.e. THIS producer's attestation_get wire. + assert "legis MCP governance read" in federation diff --git a/tests/contract/weft/test_git_rename_wire_conformance.py b/tests/contract/weft/test_git_rename_wire_conformance.py new file mode 100644 index 0000000..03d68c0 --- /dev/null +++ b/tests/contract/weft/test_git_rename_wire_conformance.py @@ -0,0 +1,149 @@ +"""Shared Weft conformance test: the legis -> Loomweave git-rename wire contract. + +This is the PRODUCER half of a single cross-member golden, +``vectors/git_renames.v1.json``. The byte-identical file is vendored into +Loomweave at ``crates/loomweave-cli/tests/fixtures/weft/git_renames.v1.json``; +its in-module oracle drives the REAL ``parse_legis_rename_json`` over the same +bytes. This file proves the SAME projection from legis's side, two ways: + +1. **Live recipe** — fabricate a real rename in a temp git repo, call the REAL + ``GET /git/renames`` via ``TestClient`` (exactly like the existing + ``tests/contract/test_git_renames_contract.py``), and assert its projected + ``(old_path, new_path)`` pairs *contain* the golden's real rename. Membership, + not strict equality: ``commit_sha``/blobs are nondeterministic per fabrication + and the empty-``new_path`` skip item is synthetic (git never emits it), so a + live re-run cannot reproduce the frozen array byte-for-byte — only the + projected real-rename pair is stable. That projection is what the consumer + actually reads; everything else it ignores. + +2. **Layer-1 byte-pin** — recompute the git blob sha1 of the *frozen* golden file + in-process and assert it equals ``VENDORED_BLOB_SHA``. This is the + canonicalization-drift detector: if anyone re-serializes the golden (key + order, whitespace, trailing newline) on either side, the bytes change, the + sha1 stops reproducing, and CI fails before the drift can reach production. + Loomweave's consumer oracle pins the SAME constant against the SAME bytes, so + the two repos are locked to one wire shape. + +Why this exists (Weft incident 2026-06-10, root cause #2 / G16, +clarion-73dff1d2d1): the ``/git/renames`` shape and Loomweave's +``parse_legis_rename_json`` were hand-aligned with no shared test. A producer key +rename (``old_path`` -> anything else) or an envelope migration (``[...]`` -> +``{"renames":[...]}``) zeroes the rename signal with no error, orphaning +renamed-with-edit entities under fresh SEIs. A contract fix without its vector +just re-creates the drift; this golden + the two oracles is how the fix is real. +""" + +from __future__ import annotations + +import hashlib +import subprocess +from pathlib import Path + +from fastapi.testclient import TestClient + +from legis.api.app import create_app + +VECTOR_PATH = Path(__file__).parent / "vectors" / "git_renames.v1.json" + +# The git blob sha1 of the frozen golden bytes: +# sha1(b"blob \0" + bytes). NOT git — recomputed in-process below and by +# Loomweave's consumer oracle over the byte-identical vendored copy. Re-pin only +# by re-freezing the golden from the real endpoint and updating BOTH repos. +VENDORED_BLOB_SHA = "74f69ee81b030a31f9fdd37dc1c49543fbe389c2" + + +def _git(repo: Path, *args: str) -> None: + subprocess.run(["git", "-C", str(repo), *args], check=True, + capture_output=True, text=True) + + +def _project_pairs(items: list[dict]) -> list[tuple[str, str]]: + """Re-implements Loomweave's parse_legis_rename_json projection: array -> + (old_path, new_path) pairs, skipping any item whose path is missing, + non-string, or empty. Kept inline so this producer oracle is self-contained + and mirrors the consumer's contract exactly.""" + out: list[tuple[str, str]] = [] + for it in items: + old, new = it.get("old_path"), it.get("new_path") + if isinstance(old, str) and isinstance(new, str) and old and new: + out.append((old, new)) + return out + + +def test_vendored_golden_blob_sha_is_byte_pinned() -> None: + """Layer-1: the frozen golden's bytes reproduce VENDORED_BLOB_SHA. Tamper the + file and this fails before any wire drift can ship. Loomweave pins the same + constant against the byte-identical vendored copy.""" + data = VECTOR_PATH.read_bytes() + recomputed = hashlib.sha1(b"blob %d\0" % len(data) + data).hexdigest() + assert recomputed == VENDORED_BLOB_SHA, ( + f"golden bytes changed: {recomputed} != pinned {VENDORED_BLOB_SHA}. " + "Re-freeze from the real endpoint and update BOTH repos' constant." + ) + + +def test_golden_is_a_flat_array_with_the_three_required_shapes() -> None: + """The golden is a RAW /git/renames array (not the wrapped vector envelope), + because parse_legis_rename_json calls value.as_array(); an object would parse + to empty. It must carry: >=1 real rename, an item with extra ignored fields, + and an empty-new_path item the consumer skips.""" + import json + + array = json.loads(VECTOR_PATH.read_text(encoding="utf-8")) + assert isinstance(array, list), "consumer requires a flat top-level array" + pairs = _project_pairs(array) + # The real rename survives projection. + assert ("auth.py", "authn.py") in pairs + # The empty-new_path item is dropped (synthetic skip case). + assert any(it.get("new_path") == "" for it in array), "skip case present" + assert ("ghost.py", "") not in pairs and not any( + p[0] == "ghost.py" for p in pairs + ), "empty new_path must not project to a pair" + # The real item carries the extra fields the consumer ignores. + real = next(it for it in array if it.get("new_path") == "authn.py") + for ignored in ("commit_sha", "similarity", "old_blob", "new_blob"): + assert ignored in real, f"real item should carry ignored field {ignored}" + + +def test_real_endpoint_projects_the_golden_real_rename(tmp_path: Path) -> None: + """Drive the REAL GET /git/renames over a fabricated rename and assert its + projected pairs CONTAIN the golden's real rename. The live array can carry + only the real rename (the skip item is synthetic), so membership — not strict + equality — is the right contract; it is exactly what the consumer reads.""" + repo = tmp_path / "repo" + repo.mkdir() + _git(repo, "init", "-q") + _git(repo, "config", "user.email", "t@t") + _git(repo, "config", "user.name", "t") + (repo / "auth.py").write_text("def login():\n return 1\n" * 5) + _git(repo, "add", "-A") + _git(repo, "commit", "-q", "-m", "base") + base = subprocess.run( + ["git", "-C", str(repo), "rev-parse", "HEAD"], + capture_output=True, text=True, + ).stdout.strip() + (repo / "authn.py").write_text((repo / "auth.py").read_text()) + (repo / "auth.py").unlink() + _git(repo, "add", "-A") + _git(repo, "commit", "-q", "-m", "rename auth -> authn") + + client = TestClient(create_app(repo_path=str(repo))) + resp = client.get("/git/renames", params={"rev_range": f"{base}..HEAD"}) + assert resp.status_code == 200, resp.text + items = resp.json() + assert isinstance(items, list), "Loomweave requires an array" + live_pairs = _project_pairs(items) + + import json + + golden = json.loads(VECTOR_PATH.read_text(encoding="utf-8")) + # The golden's projected pairs == the live endpoint's projected pairs. The + # synthetic empty-new_path skip item drops out of BOTH projections, so this is + # exact equality, not membership — it also catches the live endpoint emitting + # any rename the golden does not (an over-carry the consumer would silently + # accept). The pair is stable regardless of git's reported similarity %. + assert _project_pairs(golden) == [("auth.py", "authn.py")] + assert live_pairs == _project_pairs(golden), ( + f"real endpoint projection diverged from the golden; " + f"live={live_pairs} golden={_project_pairs(golden)}" + ) diff --git a/tests/contract/weft/test_signoff_binding_wire_conformance.py b/tests/contract/weft/test_signoff_binding_wire_conformance.py new file mode 100644 index 0000000..dcf0116 --- /dev/null +++ b/tests/contract/weft/test_signoff_binding_wire_conformance.py @@ -0,0 +1,228 @@ +"""Shared Weft conformance test: the legis -> Filigree governed sign-off binding wire. + +This is the PRODUCER half of a single cross-member golden, +``vectors/signoff_binding.v1.json``. The byte-identical file is vendored into +Filigree at ``filigree/tests/fixtures/contracts/legis-signoff-binding-request.json``; +its consumer oracle drives the REAL ``POST /api/issue/{id}/entity-associations`` +route over the same bytes. legis is the AUTHORITY for this request body. + +What this wire IS, and why it is a distinct freezable seam (not subsumed by the +existing entity-association oracle): + + * legis binds a cleared governed sign-off to a Filigree issue by POSTing to the + classic entity-association route (``HttpFiligreeClient.attach`` -> + ``POST /api/issue/{issue_id}/entity-associations``). The BASE entity-association + body is ``{entity_id, content_hash, actor}`` — that shape is already exercised + by the loomweave<->filigree reverse-lookup oracle. The GOVERNED body carries + two MORE fields, ``signoff_seq`` and ``signature``, which are the legis-specific + extension (Filigree v25/B1). Those are what filigree's POST handler parses and + persists into dedicated columns; the existing entity-associations oracle freezes + only the GET *response* body and never touches ``signoff_seq``/``signature``. So + this governed REQUEST body is a genuinely uncovered, distinct wire — and the + golden MUST carry both extension fields or it would re-freeze the already-covered + base shape. + + * The ``signature`` is ``sign({issue_id, entity_id, content_hash, signoff_seq}, key)`` + (``legis.enforcement.signing.sign`` -> deterministic HMAC-SHA256 over + ``canonical_json`` of the field dict, tagged ``hmac-sha256:v2:``). It folds in NO + clock, nonce, or salt, so with a FIXED key it reproduces byte-for-byte — which is + what makes a byte-pin meaningful. ``issue_id`` is committed by the signature but + rides the URL PATH, not the body, so it is absent from the frozen body bytes. + + * The body bytes on the wire are ``weft_signing.weft_body_bytes(body)`` — + ``json.dumps(body, sort_keys=True, separators=(",", ":"))`` with the default + ``ensure_ascii=True``. The route is transport-open (G11): legis emits no + ``X-Weft-*`` headers; the app-level ``signature`` travels in the JSON body and + filigree stores it verbatim WITHOUT verifying (it holds no key). + +This file proves the same projection from legis's side, two ways: + +1. **Producer-source recheck (non-circular).** Drive the REAL + ``bind_signoff_to_issue`` over the REAL ``HttpFiligreeClient`` with a FIXED key and + the golden's own signing inputs, monkeypatching only the socket open + (``_open_no_redirect``) to capture the exact bytes the transport would POST, and + assert they equal the canonical bytes of the golden's recorded ``request_body``. + The full producer path runs (real ``attach`` body assembly -> real ``_urllib_fetch`` + -> real ``_json_body_bytes`` canonicalization -> real ``sign()`` signature), none of + it copied from the golden, so a rename of any body key in the client, a change to + the signed-field set, the ``:v2:`` tag, or the canonicalization reds here. + +2. **Layer-1 byte-pin.** Recompute the git-blob sha1 of the *frozen* golden file + in-process and assert it equals ``VENDORED_BLOB_SHA``. Filigree's consumer oracle + pins the SAME constant against the byte-identical vendored copy, locking both + repos to one wire shape. + +These oracles are fully in-process (the real client with only its socket-open +monkeypatched, no network), so they run in the default suite with no new pytest marker. +""" + +from __future__ import annotations + +import hashlib +import json +from pathlib import Path +from typing import Any, Literal + +import legis.filigree.client as client_mod +from legis.enforcement.signing import sign +from legis.filigree.client import HttpFiligreeClient +from legis.governance.signoff_binding import bind_signoff_to_issue +from legis.identity.entity_key import EntityKey +from legis.weft_signing import weft_body_bytes + +VECTOR_PATH = Path(__file__).parent / "vectors" / "signoff_binding.v1.json" + +# The git blob sha1 of the frozen golden bytes: +# sha1(b"blob \0" + bytes). Recomputed in-process below and by Filigree's +# consumer oracle over the byte-identical vendored copy. Re-pin only by +# re-freezing the golden from the real signer and updating BOTH repos' constant. +VENDORED_BLOB_SHA = "8796aeb5b8d7d067c82af17e361aa45fe5007b4e" + + +def _blob_sha(data: bytes) -> str: + """git's blob object id for ``data``: ``sha1(b"blob \\0" + data)``.""" + return hashlib.sha1(b"blob %d\0" % len(data) + data).hexdigest() # noqa: S324 (content addressing, not security) + + +def _load_golden() -> dict[str, Any]: + golden: dict[str, Any] = json.loads(VECTOR_PATH.read_text(encoding="utf-8")) + return golden + + +class _CapturingResponse: + """A minimal urllib response stand-in: enough for ``_decode_json_response``.""" + + headers = {"Content-Type": "application/json"} + + def read(self, _n: int) -> bytes: + return b'{"issue_id":"x","loomweave_entity_id":"x","content_hash_at_attach":"h","attached_at":"t","attached_by":"legis"}' + + def __enter__(self) -> "_CapturingResponse": + return self + + def __exit__(self, *exc: object) -> Literal[False]: + return False + + +def _capture_real_client_wire_bytes( + monkeypatch: Any, +) -> tuple[HttpFiligreeClient, dict[str, Any]]: + """Build the REAL ``HttpFiligreeClient`` and capture the exact bytes its transport + would POST, by monkeypatching ``_open_no_redirect`` to grab ``req.data``. + + This drives the genuine producer path end to end — real ``attach`` assembles the + body, real ``_urllib_fetch`` builds the request, real ``_json_body_bytes`` + (``weft_body_bytes``) canonicalizes it — so a rename of any body key in the real + client (``content_hash`` -> ``contentHash``, etc.) reds here. The same byte-capture + idiom as ``tests/filigree/test_client.py::test_wire_body_is_stable_compact_json``. + """ + captured: dict[str, Any] = {} + + def fake_open_no_redirect(req: Any) -> _CapturingResponse: + captured["data"] = req.data + captured["url"] = req.full_url + captured["method"] = req.get_method() + captured["headers"] = dict(req.header_items()) + return _CapturingResponse() + + monkeypatch.setattr(client_mod, "_open_no_redirect", fake_open_no_redirect) + client = HttpFiligreeClient("https://filigree.test") + return client, captured + + +def test_vendored_golden_blob_sha_is_byte_pinned() -> None: + """Layer-1: the frozen golden's bytes reproduce VENDORED_BLOB_SHA. + + Tamper the file (any byte: a field value, key order, whitespace, the trailing + newline) and this reds before any wire drift can ship. Filigree's consumer + oracle pins the same constant against the byte-identical vendored copy. + """ + data = VECTOR_PATH.read_bytes() + recomputed = _blob_sha(data) + assert recomputed == VENDORED_BLOB_SHA, ( + f"golden bytes changed: {recomputed} != pinned {VENDORED_BLOB_SHA}. " + "Re-freeze from the real signer and update BOTH repos' constant." + ) + + +def test_golden_carries_the_distinct_governed_extension_fields() -> None: + """The golden body must carry ``signoff_seq`` + ``signature`` (the legis-specific + extension). Without them this would re-freeze the base entity-association shape, + which the loomweave<->filigree reverse-lookup oracle already covers — a tautology. + """ + body = _load_golden()["request_body"] + assert set(body) == {"entity_id", "content_hash", "actor", "signoff_seq", "signature"} + assert isinstance(body["signoff_seq"], int) and not isinstance(body["signoff_seq"], bool) + assert body["signature"].startswith("hmac-sha256:v2:") + + +def test_real_bind_emits_the_golden_request_body_bytes(monkeypatch: Any) -> None: + """Producer-source recheck (non-circular): drive the REAL + ``bind_signoff_to_issue`` over the REAL ``HttpFiligreeClient`` with a FIXED key and + the golden's signing inputs, and assert the bytes legis's transport actually puts + on the wire equal the canonical bytes of the golden's recorded ``request_body``. + + The full producer path runs: real ``attach`` assembles the body, real + ``_urllib_fetch`` builds the request, real ``_json_body_bytes`` canonicalizes it, + and real ``sign()`` (inside ``bind_signoff_to_issue``) cuts the signature — none of + it copied from the golden. So a drift in the client's body-key names, the signed + field set, the ``:v2:`` tag, or the canonicalization reds here. ``issue_id`` is + committed by the signature but rides the URL path, so it is absent from the body + bytes (it appears in the captured request URL instead). + """ + golden = _load_golden() + inputs = golden["signing_inputs"] + key = bytes.fromhex(inputs["key_hex"]) + signed = inputs["signed_fields"] + + client, captured = _capture_real_client_wire_bytes(monkeypatch) + out = bind_signoff_to_issue( + client, + issue_id=inputs["issue_id"], + entity_key=EntityKey.from_sei(signed["entity_id"]), + content_hash=signed["content_hash"], + signoff_seq=signed["signoff_seq"], + key=key, + ) + + # The signature the real signer produced, independently of the golden's copy. + expected_sig = sign( + { + "issue_id": inputs["issue_id"], + "entity_id": signed["entity_id"], + "content_hash": signed["content_hash"], + "signoff_seq": signed["signoff_seq"], + }, + key, + ) + assert out["binding_signature"] == expected_sig + assert expected_sig == golden["signing_inputs"]["signature"], ( + "real sign() no longer reproduces the golden signature — the signed-field " + "set or canonicalization drifted, or the golden is stale." + ) + + # The load-bearing assertion: the REAL transport's emitted wire bytes equal the + # canonical bytes of the golden's recorded body. ``captured["data"]`` is exactly + # what ``HttpFiligreeClient`` would have sent over the socket. + assert captured["data"] == weft_body_bytes(golden["request_body"]), ( + "legis's real client no longer emits the frozen governed sign-off request " + "body bytes; re-freeze the golden from the real client and update BOTH repos." + ) + + assert captured["data"] == weft_body_bytes( + { + "entity_id": signed["entity_id"], + "content_hash": signed["content_hash"], + "actor": "legis", + "signoff_seq": signed["signoff_seq"], + "signature": expected_sig, + } + ) + + # The request went to the issue-scoped entity-association route, with issue_id in + # the PATH (not the body) — the half of the signed tuple the body omits. Proven by + # the real transport's captured URL + method, not re-derived. + assert captured["method"] == "POST" + assert captured["url"] == ( + f"https://filigree.test/api/issue/{inputs['issue_id']}/entity-associations" + ) diff --git a/tests/contract/weft/vectors/README.md b/tests/contract/weft/vectors/README.md index 7a81a3b..46bd38b 100644 --- a/tests/contract/weft/vectors/README.md +++ b/tests/contract/weft/vectors/README.md @@ -18,6 +18,7 @@ consumer's CI**. A contract fix without its vector just re-creates the drift. |---|---|---|---| | `wardline_scan_artifact.v1.json` | `weft/wardline-scan-artifact` | Wardline (`core/legis.py`) | legis (`wardline/ingest.py`) | | `wardline_dirty_scan_artifact.v1.json` | `weft/wardline-dirty-scan-artifact` | Wardline (`core/legis.py`) | legis (`wardline/ingest.py`) | +| `git_renames.v1.json` | `weft/legis-git-renames` | legis (`GET /git/renames`) | Loomweave (`loomweave-cli/src/sei_git.rs::parse_legis_rename_json`) | ## How each side loads it @@ -63,3 +64,50 @@ CI fails on that side. When the contract changes, bump the `version`, regenerate - `invalid[]` — `{name, description, artifact, reject_match}`. Each must raise a `WardlinePayloadError` whose message matches `reject_match` — never read as zero defects under a green status. + +## Git-rename golden (`git_renames.v1.json`) + +This golden is a **raw `GET /git/renames` response** — a flat top-level JSON array +of `RenameEvidence` objects, NOT the wrapped `{contract, version, valid[]}` +envelope the Wardline vectors use. It cannot be wrapped: Loomweave's consumer +(`parse_legis_rename_json`) calls `serde_json::Value::as_array()` and treats any +object as a `NonArrayEnvelope` → zero renames. Provenance and anchors therefore +live here in the README, not inside the file. + +The two oracles: + +- **legis (producer)** — `tests/contract/weft/test_git_rename_wire_conformance.py` + drives the REAL `GET /git/renames` over a fabricated rename and asserts its + projected `(old_path, new_path)` pairs *contain* the golden's real rename + (membership, because `commit_sha`/blobs are nondeterministic per fabrication + and the skip item is synthetic). It also recomputes the golden's git blob sha1 + in-process and pins it to `VENDORED_BLOB_SHA`. +- **Loomweave (consumer)** — an in-module `#[test]` in + `crates/loomweave-cli/src/sei_git.rs` (`vendored_golden_*`) drives the REAL + `parse_legis_rename_json` over the byte-identical vendored copy at + `crates/loomweave-cli/tests/fixtures/weft/git_renames.v1.json` (loaded with + `include_bytes!`), asserts the projected pairs, and pins the SAME blob sha1. + The test lives next to the parser, not under `tests/`, because the parser is + private to a binary crate with no `lib.rs` and is unreachable from integration + tests. + +### Honest freeze provenance + +The array's items are NOT all live-captured — only the real rename is: + +- **Item 1 (captured live):** `auth.py → authn.py`, frozen verbatim from legis's + actual `GET /git/renames` output (FastAPI `TestClient` over a fabricated git + rename), then `commit_sha`/`old_blob`/`new_blob` redacted to fixed + placeholders so the FROZEN file is reproducible (the consumer ignores all + three; the producer oracle compares projected pairs, never `commit_sha`). This + item simultaneously satisfies "≥1 real rename" and "extra ignored fields" — it + carries `commit_sha`, `similarity`, `old_blob`, `new_blob`, all dropped by the + consumer. +- **Item 2 (synthetic, appended by hand):** an empty-`new_path` item + (`ghost.py → ""`). Git/legis never emit an empty `new_path`, so it cannot be + captured live; it is the skip case the consumer must drop, appended explicitly. + +If legis's real output and Loomweave's real parser ever DISAGREE on these bytes, +that disagreement is the seam's value — re-freeze from the real endpoint and fix +the parser, never hand-mint the golden to hide it. Both oracles run UNMARKED (by +default); re-pin the sha1 only by re-freezing and updating BOTH repos. diff --git a/tests/contract/weft/vectors/git_renames.v1.json b/tests/contract/weft/vectors/git_renames.v1.json new file mode 100644 index 0000000..74f69ee --- /dev/null +++ b/tests/contract/weft/vectors/git_renames.v1.json @@ -0,0 +1,18 @@ +[ + { + "commit_sha": "0000000000000000000000000000000000000000", + "new_blob": "", + "new_path": "authn.py", + "old_blob": "", + "old_path": "auth.py", + "similarity": 100 + }, + { + "commit_sha": "0000000000000000000000000000000000000000", + "new_blob": "", + "new_path": "", + "old_blob": "", + "old_path": "ghost.py", + "similarity": 0 + } +] diff --git a/tests/contract/weft/vectors/signoff_binding.v1.json b/tests/contract/weft/vectors/signoff_binding.v1.json new file mode 100644 index 0000000..8796aeb --- /dev/null +++ b/tests/contract/weft/vectors/signoff_binding.v1.json @@ -0,0 +1,26 @@ +{ + "contract": "legis.signoff_binding.v1", + "description": "legis->filigree governed sign-off binding POST request body. legis is the producer (HttpFiligreeClient.attach -> POST /api/issue/{issue_id}/entity-associations); filigree is the consumer (dashboard_routes/entities.py, v25/B1) which parses and persists signature + signoff_seq. The signature is sign({issue_id, entity_id, content_hash, signoff_seq}, key) (legis.enforcement.signing, deterministic HMAC-SHA256 over canonical_json). issue_id rides the URL path, not the body.", + "endpoint": { + "method": "POST", + "path": "/api/issue/LEGIS-SIGNOFF-1/entity-associations" + }, + "request_body": { + "actor": "legis", + "content_hash": "blake3:fixedcontenthashfixedcontenthashfixedcontenthashfixedcontent", + "entity_id": "loomweave:eid:0123456789abcdef0123456789abcdef", + "signature": "hmac-sha256:v2:2d43280fad6b80fc879d4569e2a8b3c86f94c2538616d578a8d0fc479e5d0962", + "signoff_seq": 7 + }, + "signing_inputs": { + "issue_id": "LEGIS-SIGNOFF-1", + "key_hex": "6c656769732d7369676e6f66662d776972652d76310000000000000000000000", + "signature": "hmac-sha256:v2:2d43280fad6b80fc879d4569e2a8b3c86f94c2538616d578a8d0fc479e5d0962", + "signed_fields": { + "content_hash": "blake3:fixedcontenthashfixedcontenthashfixedcontenthashfixedcontent", + "entity_id": "loomweave:eid:0123456789abcdef0123456789abcdef", + "issue_id": "LEGIS-SIGNOFF-1", + "signoff_seq": 7 + } + } +} diff --git a/tests/test_cli_install.py b/tests/test_cli_install.py index 6e6572f..546ed53 100644 --- a/tests/test_cli_install.py +++ b/tests/test_cli_install.py @@ -33,6 +33,18 @@ def test_install_selective_gitignore_only(tmp_path, monkeypatch): assert not (tmp_path / ".claude").exists() +def test_install_gitignore_ships_nested_dir_ignore(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + rc = main(["install", "--gitignore"]) + assert rc == 0 + # root rule... + assert ".weft/legis/" in (tmp_path / ".gitignore").read_text() + # ...and the nested suite-standard ignore (filigree-4ed8152630) + nested = tmp_path / ".weft" / "legis" / ".gitignore" + assert nested.is_file() + assert install.LEGIS_DIR_GITIGNORE_MARKER in nested.read_text() + + def test_install_claude_md_only(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) rc = main(["install", "--claude-md"]) diff --git a/tests/test_doctor.py b/tests/test_doctor.py index 27017cf..979a526 100644 --- a/tests/test_doctor.py +++ b/tests/test_doctor.py @@ -221,9 +221,9 @@ def test_cli_doctor_fix_dest_is_fix(): assert parser.parse_args(["doctor"]).fix is False -def test_doctor_json_carries_repairable_per_check_and_true_for_six(tmp_path, capsys): - # repairable is always present per check, and True exactly for the six - # repair-honoring check functions (which emit eight check ids, since the +def test_doctor_json_carries_repairable_per_check_and_true_for_seven(tmp_path, capsys): + # repairable is always present per check, and True exactly for the seven + # repair-honoring check functions (which emit nine check ids, since the # instruction-block and skill-pack checks each run for two targets). run_doctor(tmp_path, repair=False, fmt="json") payload = json.loads(capsys.readouterr().out) @@ -238,6 +238,7 @@ def test_doctor_json_carries_repairable_per_check_and_true_for_six(tmp_path, cap "install.agents_skill", "install.hook", "install.gitignore", + "install.dir_gitignore", "install.mcp_json", "store.dir", } @@ -485,6 +486,38 @@ def test_gitignore_missing_root_reports_error_instead_of_raising(tmp_path): assert str(missing) in (repaired.message or "") +def test_dir_gitignore_absent_dir_is_ok(tmp_path): + # No .weft/legis/ yet — created lazily; nothing to protect, so OK + # (mirrors check_store_dir's "absent is ok"). + from legis.doctor import check_dir_gitignore + + assert check_dir_gitignore(tmp_path, repair=False).status == "ok" + + +def test_dir_gitignore_present_dir_missing_nested_is_error_then_repaired(tmp_path): + from legis.doctor import check_dir_gitignore + + (tmp_path / ".weft" / "legis").mkdir(parents=True) + c = check_dir_gitignore(tmp_path, repair=False) + assert c.status == "error" and c.repairable is True + fixed = check_dir_gitignore(tmp_path, repair=True) + assert fixed.status == "ok" and fixed.fixed is True + nested = tmp_path / ".weft" / "legis" / ".gitignore" + assert legis_install.LEGIS_DIR_GITIGNORE_MARKER in nested.read_text() + + +def test_dir_gitignore_present_nested_is_ok(tmp_path): + from legis.doctor import check_dir_gitignore + + legis_install.ensure_legis_dir_gitignore(tmp_path) + assert check_dir_gitignore(tmp_path, repair=False).status == "ok" + + +def test_collect_checks_includes_dir_gitignore(tmp_path): + ids = {c.id for c in collect_checks(tmp_path, repair=False)} + assert "install.dir_gitignore" in ids + + def test_skill_pack_absent_is_error(tmp_path): assert check_skill_pack(tmp_path, ".claude", repair=False).status == "error" diff --git a/tests/test_install.py b/tests/test_install.py index 22fab0c..3e47d55 100644 --- a/tests/test_install.py +++ b/tests/test_install.py @@ -5,7 +5,9 @@ import json import logging import os +import shutil import stat +import subprocess import sys import pytest @@ -708,6 +710,77 @@ def test_ensure_gitignore_idempotent(tmp_path): assert (tmp_path / ".gitignore").read_text() == first +# --------------------------------------------------------------------------- +# Nested .weft/legis/.gitignore (suite standard: filigree-4ed8152630) +# --------------------------------------------------------------------------- + + +def _git(repo, *args): + return subprocess.run(["git", *args], cwd=repo, capture_output=True, text=True, check=False) + + +def test_ensure_legis_dir_gitignore_creates_nested_file(tmp_path): + ok, _msg = install.ensure_legis_dir_gitignore(tmp_path) + assert ok + nested = tmp_path / ".weft" / "legis" / ".gitignore" + assert nested.is_file() + content = nested.read_text() + # durable-vs-ephemeral header + named runtime files (reviewer visibility) + assert install.LEGIS_DIR_GITIGNORE_MARKER in content + assert "legis-*.db" in content + assert "operator.age" in content + assert "operator_session.json" in content + # atomic-write staging temps (mkstemp orphans) — incl. the secret-shaped one + assert "*.tmp" in content + assert ".operator.age.*" in content + + +def test_ensure_legis_dir_gitignore_idempotent(tmp_path): + install.ensure_legis_dir_gitignore(tmp_path) + nested = tmp_path / ".weft" / "legis" / ".gitignore" + first = nested.read_text() + ok, msg = install.ensure_legis_dir_gitignore(tmp_path) + assert ok + assert "already" in msg + assert nested.read_text() == first + + +def test_ensure_legis_dir_gitignore_appends_to_user_file_without_clobber(tmp_path): + legis_dir = tmp_path / ".weft" / "legis" + legis_dir.mkdir(parents=True) + (legis_dir / ".gitignore").write_text("# my own rule\nscratch/\n") + ok, msg = install.ensure_legis_dir_gitignore(tmp_path) + assert ok + assert "Added" in msg + content = (legis_dir / ".gitignore").read_text() + assert "scratch/" in content # user content preserved + assert install.LEGIS_DIR_GITIGNORE_MARKER in content # legis block appended + + +def test_nested_gitignore_hides_runtime_state_without_root_weft_rule(tmp_path): + """The standard's actual guarantee: the nested file hides legis runtime + state even when the project-root .gitignore has NO .weft/ rule, so the + nested file is provably the thing doing the work.""" + if shutil.which("git") is None: + pytest.skip("git not available") + repo = tmp_path + _git(repo, "init") + # Root .gitignore deliberately has NO .weft/ rule — any ignore of legis + # runtime files can therefore only come from the nested file. + (repo / ".gitignore").write_text("node_modules/\n") + install.ensure_legis_dir_gitignore(repo) + legis_dir = repo / ".weft" / "legis" + (legis_dir / "legis-pulls.db").write_bytes(b"") + (legis_dir / "operator.age").write_bytes(b"") + (legis_dir / "operator_session.json5678.tmp").write_bytes(b"") # mkstemp orphan + # DB + secret + staging temp are ignored by the nested file... + assert _git(repo, "check-ignore", ".weft/legis/legis-pulls.db").returncode == 0 + assert _git(repo, "check-ignore", ".weft/legis/operator.age").returncode == 0 + assert _git(repo, "check-ignore", ".weft/legis/operator_session.json5678.tmp").returncode == 0 + # ...but the nested .gitignore itself stays tracked (not ignored). + assert _git(repo, "check-ignore", ".weft/legis/.gitignore").returncode == 1 + + # --------------------------------------------------------------------------- # Command resolution and safe-path edges # --------------------------------------------------------------------------- diff --git a/tests/warpline_preflight/fixtures/PROVENANCE.md b/tests/warpline_preflight/fixtures/PROVENANCE.md new file mode 100644 index 0000000..1d590ec --- /dev/null +++ b/tests/warpline_preflight/fixtures/PROVENANCE.md @@ -0,0 +1,43 @@ +# Warpline preflight golden — provenance + +`warpline-preflight-golden.json` freezes the two response shapes legis's real +warpline preflight consumer parses: + + * `GET /api/impact-radius` -> `{"affected": [{"sei": ...}, ...], "count": N}` + * `GET /api/reverify-worklist` -> `{"entries": [{"sei": ...}, ...], "count": N}` + +These are the shapes `legis.warpline_preflight.client.HttpWarplineClient` +(`impact_radius` / `reverify_worklist`) and `legis.service.preflight.read_warpline_preflight` +expect today. + +## NOT vendored from warpline — frozen to the legis-expected contract + +This golden is **frozen to the shape legis's client expects**, NOT vendored +byte-identical from a warpline source fixture. As of this writing warpline ships +**no producer** for these flat REST shapes: + + * warpline has **no HTTP server**; its surface is MCP/CLI only. + * warpline's `impact_radius` / `reverify_worklist` commands return the rich + envelope schemas `warpline.impact_radius.v1` / `warpline.reverify_worklist.v1` + (`{schema, ok, query, data: {... affected / items ...}, enrichment, ...}`), + where the affected set is nested under `data` and the reverify list is + `data.items` — NOT the top-level flat `{"affected"/"entries", "count"}` legis + parses. There is no top-level `count`. + * warpline's only legis-facing seam (`federation.py`) runs the OPPOSITE + direction: warpline CONSULTS legis governance as a federation peer. + +## WARPLINE PRODUCER-SIDE OBLIGATION + +For this seam to reach a shared, byte-identical golden, warpline must ship a +producer (an HTTP `GET /api/impact-radius` + `GET /api/reverify-worklist`, or an +equivalent flat-shape projection) emitting exactly: + + impact-radius : {"affected": [{"sei": "loomweave:eid:<32hex>", ...}], "count": N} + reverify-worklist: {"entries": [{"sei": "loomweave:eid:<32hex>", ...}], "count": N} + +and vendor a contract fixture for it under +`warpline/tests/fixtures/contracts/warpline/`. The Layer-2 recheck in +`test_warpline_preflight_oracle.py` (`test_golden_matches_warpline_source`) +points at that path and `pytest.skip`s until it exists; it activates +automatically and will then enforce byte-equality (re-vendor + update the +byte-pin once the producer ships). diff --git a/tests/warpline_preflight/fixtures/warpline-preflight-golden.json b/tests/warpline_preflight/fixtures/warpline-preflight-golden.json new file mode 100644 index 0000000..44bb515 --- /dev/null +++ b/tests/warpline_preflight/fixtures/warpline-preflight-golden.json @@ -0,0 +1,28 @@ +{ + "impact_radius": { + "affected": [ + { + "sei": "loomweave:eid:0123456789abcdef0123456789abcdef", + "locator": "python:function:src/pkg/mod.py::fn", + "depth": 1 + }, + { + "sei": "loomweave:eid:fedcba9876543210fedcba9876543210", + "locator": "python:function:src/pkg/other.py::helper", + "depth": 2 + } + ], + "count": 2 + }, + "reverify_worklist": { + "entries": [ + { + "sei": "loomweave:eid:0123456789abcdef0123456789abcdef", + "locator": "python:function:src/pkg/mod.py::fn", + "priority": "high", + "reason": "changed" + } + ], + "count": 1 + } +} diff --git a/tests/warpline_preflight/test_warpline_preflight_oracle.py b/tests/warpline_preflight/test_warpline_preflight_oracle.py new file mode 100644 index 0000000..986dab7 --- /dev/null +++ b/tests/warpline_preflight/test_warpline_preflight_oracle.py @@ -0,0 +1,194 @@ +"""Weft warpline-preflight conformance oracle — Legis as consumer. + +Legis reads warpline's ADVISORY preflight surface via +``legis.warpline_preflight.client.HttpWarplineClient`` and +``legis.service.preflight.read_warpline_preflight``. This oracle freezes the two +response shapes legis parses and drives legis's REAL parse path over the frozen +bytes, so a shape change fails CI until legis updates the consumer. + +Three layers, mirroring ``tests/conformance/test_sei_oracle*``: + + * Layer-1 byte-pin (``test_golden_byte_pin``): UNMARKED, default-suite, + recomputes the git blob sha1 in-process and fails CLOSED on any byte drift. + * Non-circular consumer oracle (``test_*_drives_real_legis_parse``): the frozen + golden BYTES flow through legis's real ``_decode_json_response`` (only the HTTP + transport is stubbed, never the parse logic) and through the real + ``read_warpline_preflight``; assertions are on HARDCODED SEIs/counts, never a + re-parse of the golden. + * Layer-2 source recheck (``test_golden_matches_warpline_source``): compares the + frozen golden to warpline's source contract fixture; SKIPS CLEAN when absent + and names the producer obligation. + +PROVENANCE: this golden is frozen to the shape legis's client expects, NOT +vendored from warpline. Warpline ships no producer for these flat REST shapes +today (no HTTP server; its surface is the rich ``warpline.impact_radius.v1`` / +``warpline.reverify_worklist.v1`` envelope). See ``fixtures/PROVENANCE.md``. +""" +from __future__ import annotations + +import hashlib +import json +import os +from pathlib import Path + +import pytest + +from legis.service.preflight import read_warpline_preflight +from legis.warpline_preflight.client import ( + HttpWarplineClient, + _decode_json_response, +) + +GOLDEN_PATH = Path(__file__).parent / "fixtures" / "warpline-preflight-golden.json" + +# git blob sha1 of tests/warpline_preflight/fixtures/warpline-preflight-golden.json. +# Frozen to the shape legis's warpline preflight client expects (NOT vendored +# from warpline — warpline ships no producer for these flat REST shapes). Update +# only after confirming the legis consumer contract intentionally changed; if +# warpline later ships a producer fixture, re-vendor byte-identical and re-pin. +GOLDEN_BLOB_SHA = "44bb515d528fdaca5b12703a896f55cd96c2483b" + + +# --------------------------------------------------------------------------- +# Layer-1: fail-closed byte-pin (UNMARKED — runs in the default suite). +# --------------------------------------------------------------------------- +def _git_blob_sha1(data: bytes) -> str: + return hashlib.sha1(b"blob %d\0" % len(data) + data).hexdigest() + + +def test_golden_byte_pin(): + data = GOLDEN_PATH.read_bytes() + assert _git_blob_sha1(data) == GOLDEN_BLOB_SHA, ( + "warpline-preflight golden has drifted from its pinned bytes; update " + "GOLDEN_BLOB_SHA only after confirming the legis consumer contract change " + "is intended (and re-check the warpline producer obligation in PROVENANCE.md)." + ) + + +# --------------------------------------------------------------------------- +# Non-circular consumer oracle: golden BYTES -> legis's real decode -> real +# read_warpline_preflight. Only the HTTP transport is stubbed. +# --------------------------------------------------------------------------- +class _GoldenResp: + """A minimal urllib-response stand-in carrying the golden bytes for one route.""" + + def __init__(self, raw: bytes) -> None: + self.headers = {"Content-Type": "application/json"} + self._raw = raw + + def read(self, n: int) -> bytes: + return self._raw[:n] + + +def _golden() -> dict: + # Used ONLY to slice the two route bodies out of the single golden file so + # each can be re-serialized to bytes and pushed through legis's real decode. + # Assertions below never read from this — they are hardcoded. + return json.loads(GOLDEN_PATH.read_bytes()) + + +def _bytes_fetch(): + """An injectable Fetch that serves the golden route bytes through legis's REAL + ``_decode_json_response`` (parse logic is NOT stubbed — only transport is).""" + golden = _golden() + bodies = { + "/api/impact-radius": json.dumps(golden["impact_radius"]).encode("utf-8"), + "/api/reverify-worklist": json.dumps(golden["reverify_worklist"]).encode("utf-8"), + } + + def fetch(method, url, body): + assert method == "GET" and body is None + for route, raw in bodies.items(): + if route in url: + return _decode_json_response(_GoldenResp(raw), f"{method} {url}") + raise AssertionError(f"unexpected warpline route in oracle: {url}") + + return fetch + + +def _client() -> HttpWarplineClient: + return HttpWarplineClient("http://localhost:9100", fetch=_bytes_fetch()) + + +def test_impact_radius_drives_real_legis_parse(): + out = _client().impact_radius("base-sha", "head-sha") + assert out["count"] == 2 + assert [a["sei"] for a in out["affected"]] == [ + "loomweave:eid:0123456789abcdef0123456789abcdef", + "loomweave:eid:fedcba9876543210fedcba9876543210", + ] + + +def test_reverify_worklist_drives_real_legis_parse(): + out = _client().reverify_worklist("base-sha", "head-sha") + assert out["count"] == 1 + assert [e["sei"] for e in out["entries"]] == [ + "loomweave:eid:0123456789abcdef0123456789abcdef" + ] + + +def test_read_warpline_preflight_over_golden_is_checked_with_real_shapes(): + # The full service read: discriminated 'checked' with both sub-responses, + # parsed through legis's real client + decode over the frozen golden bytes. + result = read_warpline_preflight(_client(), "base-sha", "head-sha") + assert result["status"] == "checked" + assert result["impact_radius"]["count"] == 2 + assert [a["sei"] for a in result["impact_radius"]["affected"]] == [ + "loomweave:eid:0123456789abcdef0123456789abcdef", + "loomweave:eid:fedcba9876543210fedcba9876543210", + ] + assert result["reverify_worklist"]["count"] == 1 + assert [e["sei"] for e in result["reverify_worklist"]["entries"]] == [ + "loomweave:eid:0123456789abcdef0123456789abcdef" + ] + + +# --------------------------------------------------------------------------- +# Layer-2: drift recheck vs warpline's source contract fixture (skip-clean). +# +# Warpline ships NO producer for these flat REST shapes today (no HTTP server; +# its real surface is the rich warpline.impact_radius.v1 / .reverify_worklist.v1 +# envelope, where the affected set is nested under data.affected / data.items and +# there is no top-level count). So this recheck SKIPS CLEAN and names the +# producer obligation. It activates byte-equality enforcement automatically the +# day warpline ships a flat-shape contract fixture at the path below. +# --------------------------------------------------------------------------- +def _warpline_source_fixture() -> Path | None: + candidates: list[Path] = [] + if env := os.environ.get("WARPLINE_REPO"): + candidates.append( + Path(env) + / "tests" + / "fixtures" + / "contracts" + / "warpline" + / "preflight-rest-golden.json" + ) + candidates.append( + Path(__file__).resolve().parents[3] + / "warpline" + / "tests" + / "fixtures" + / "contracts" + / "warpline" + / "preflight-rest-golden.json" + ) + return next((path for path in candidates if path.exists()), None) + + +def test_golden_matches_warpline_source(): + source = _warpline_source_fixture() + if source is None: + pytest.skip( + "warpline ships no flat-REST preflight contract fixture " + "(preflight-rest-golden.json) — its surface is the rich " + "warpline.impact_radius.v1 / .reverify_worklist.v1 envelope, not the " + "flat {affected/entries, count} shape legis consumes. PRODUCER " + "OBLIGATION: warpline must ship GET /api/impact-radius + " + "GET /api/reverify-worklist (or an equivalent flat projection) and " + "vendor that fixture; set WARPLINE_REPO or place a sibling warpline " + "checkout to enable this drift check. See fixtures/PROVENANCE.md." + ) + assert json.loads(GOLDEN_PATH.read_bytes()) == json.loads( + source.read_text(encoding="utf-8") + ), "legis warpline-preflight golden drifted from warpline's source contract fixture"