diff --git a/README.md b/README.md index 4dbc9db..7cb0776 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,8 @@ contextlattice_checkpoint -h ./scripts/agent/memory-edge-backfill ./scripts/agent/memory-edge-backfill --include-inferred --min-confidence 0.90 ./scripts/agent/memory-edge-backfill --write +./scripts/agent/memory-edge-inferred-retrofill --all-projects +./scripts/agent/memory-edge-inferred-retrofill --all-projects --write --confirm-retrofill ALL_PROJECTS ``` ## Security and Privacy diff --git a/archive/internal-planning/engine-api.md b/archive/internal-planning/engine-api.md index c9c9302..9ef19bd 100644 --- a/archive/internal-planning/engine-api.md +++ b/archive/internal-planning/engine-api.md @@ -46,6 +46,15 @@ Key optional fields: - `inferred_min_shared_terms` (default `3`): minimum lexical overlap before scoring. - `inferred_max_token_postings` (default `64`): skips overly-common terms to bound fanout. +Operator-safe retrofill wrapper: + +```bash +./scripts/agent/memory-edge-inferred-retrofill --project context-lattice-private +./scripts/agent/memory-edge-inferred-retrofill --project context-lattice-private --write --confirm-retrofill context-lattice-private +``` + +The wrapper restricts the request to `inferred_related`, runs a dry-run preflight before any write, refuses truncated preflight results unless `--allow-truncated` is set, and repeats write mode once to verify idempotency. + ## Runtime Flags - `USE_RUST_CODEC` diff --git a/scripts/agent/memory-edge-inferred-retrofill b/scripts/agent/memory-edge-inferred-retrofill new file mode 100755 index 0000000..4123101 --- /dev/null +++ b/scripts/agent/memory-edge-inferred-retrofill @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +"""Guarded maintenance job for retroactive inferred memory edges.""" + +from __future__ import annotations + +import argparse +import json +import sys +from typing import Any + +from _common import emit, request_json + + +def relation_stats(body: dict[str, Any], relation: str) -> dict[str, Any]: + relations = body.get("relations") + if not isinstance(relations, dict): + return {} + stat = relations.get(relation) + return stat if isinstance(stat, dict) else {} + + +def as_int(value: Any) -> int: + try: + return int(value) + except (TypeError, ValueError): + return 0 + + +def build_payload(args: argparse.Namespace, dry_run: bool) -> dict[str, Any]: + relation = args.inferred_relation + payload: dict[str, Any] = { + "dry_run": dry_run, + "relations": [relation], + "include_inferred": True, + "include_low_confidence_audit": False, + "min_confidence": args.min_confidence, + "max_candidates": args.max_candidates, + "max_history_lines": args.max_history_lines, + "sample_limit": args.sample_limit, + "inferred_relation": relation, + "inferred_peer_limit": args.inferred_peer_limit, + "inferred_scan_limit": args.inferred_scan_limit, + "inferred_min_score": args.inferred_min_score, + "inferred_min_shared_terms": args.inferred_min_shared_terms, + "inferred_max_token_postings": args.inferred_max_token_postings, + } + if args.project: + payload["project"] = args.project + if args.include_cold is not None: + payload["include_cold"] = args.include_cold + if args.include_ephemeral: + payload["include_ephemeral"] = True + if args.include_test_memory: + payload["include_test_memory"] = True + return payload + + +def summarize(stage: str, body: dict[str, Any], relation: str) -> dict[str, Any]: + stat = relation_stats(body, relation) + return { + "stage": stage, + "ok": bool(body.get("ok", False)), + "dry_run": bool(body.get("dry_run", False)), + "project": body.get("project", ""), + "relation": relation, + "scanned_docs": as_int(body.get("scanned_docs")), + "generated": as_int(stat.get("generated", body.get("generated"))), + "eligible": as_int(stat.get("eligible", body.get("eligible"))), + "would_write": as_int(body.get("would_write")), + "written": as_int(stat.get("written", body.get("written"))), + "existing": as_int(stat.get("existing", body.get("existing"))), + "truncated": bool(body.get("truncated", False)), + "min_confidence": body.get("min_confidence"), + "inferred_min_score": body.get("inferred_min_score"), + "inferred_peer_limit": body.get("inferred_peer_limit"), + "inferred_scan_limit": body.get("inferred_scan_limit"), + "inferred_max_token_postings": body.get("inferred_max_token_postings"), + "errors": body.get("errors") if isinstance(body.get("errors"), list) else [], + } + + +def require_scope(args: argparse.Namespace) -> None: + if args.project or args.all_projects: + return + raise SystemExit( + json.dumps( + { + "ok": False, + "error": "project is required unless --all-projects is set", + }, + sort_keys=True, + ) + ) + + +def confirm_token(args: argparse.Namespace) -> str: + return "ALL_PROJECTS" if args.all_projects else args.project + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + scope = parser.add_mutually_exclusive_group() + scope.add_argument("--project", default="", help="Project to retrofill.") + scope.add_argument("--all-projects", action="store_true", help="Allow all projects; writes require --confirm-retrofill ALL_PROJECTS.") + parser.add_argument("--write", action="store_true", help="Persist after a clean dry-run preflight.") + parser.add_argument( + "--confirm-retrofill", + default="", + help="Required with --write. Must equal the project name, or ALL_PROJECTS with --all-projects.", + ) + parser.add_argument("--allow-truncated", action="store_true", help="Allow writes after a truncated preflight.") + parser.add_argument("--skip-idempotency-check", action="store_true", help="Do not repeat write mode to verify zero new writes.") + parser.add_argument("--include-cold", dest="include_cold", action="store_true", default=None) + parser.add_argument("--exclude-cold", dest="include_cold", action="store_false") + parser.add_argument("--include-ephemeral", action="store_true") + parser.add_argument("--include-test-memory", action="store_true") + parser.add_argument("--min-confidence", type=float, default=0.90) + parser.add_argument("--max-candidates", type=int, default=50000) + parser.add_argument("--max-history-lines", type=int, default=1) + parser.add_argument("--sample-limit", type=int, default=20) + parser.add_argument("--inferred-relation", default="inferred_related") + parser.add_argument("--inferred-peer-limit", type=int, default=1) + parser.add_argument("--inferred-scan-limit", type=int, default=5000) + parser.add_argument("--inferred-min-score", type=float, default=0.90) + parser.add_argument("--inferred-min-shared-terms", type=int, default=3) + parser.add_argument("--inferred-max-token-postings", type=int, default=64) + parser.add_argument("--timeout", type=float, default=180) + parser.add_argument("--json", action="store_true", help="Emit full response payloads instead of summaries.") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + require_scope(args) + relation = args.inferred_relation + + preflight = request_json("POST", "/v1/memory/edges/backfill", build_payload(args, True), args.timeout) + summaries = [summarize("dry_run_preflight", preflight, relation)] + if not preflight.get("ok", False): + if args.json: + emit({"ok": False, "preflight": preflight}, pretty=True) + else: + emit({"ok": False, "runs": summaries}, pretty=True) + return 1 + if preflight.get("truncated") and not args.allow_truncated: + if args.json: + emit({"ok": False, "preflight": preflight}, pretty=True) + else: + emit({"ok": False, "runs": summaries}, pretty=True) + return 2 + if not args.write: + if args.json: + emit({"ok": True, "preflight": preflight}, pretty=True) + else: + emit({"ok": True, "runs": summaries}, pretty=True) + return 0 + if args.confirm_retrofill != confirm_token(args): + summaries.append( + { + "stage": "write_blocked", + "ok": False, + "error": "confirm-retrofill token mismatch", + "expected": confirm_token(args), + } + ) + emit({"ok": False, "runs": summaries}, pretty=True) + return 2 + + write = request_json("POST", "/v1/memory/edges/backfill", build_payload(args, False), args.timeout) + summaries.append(summarize("write", write, relation)) + ok = bool(write.get("ok", False)) + repeat: dict[str, Any] | None = None + if not args.skip_idempotency_check and ok: + repeat = request_json("POST", "/v1/memory/edges/backfill", build_payload(args, False), args.timeout) + repeat_summary = summarize("idempotency_check", repeat, relation) + summaries.append(repeat_summary) + ok = bool(repeat.get("ok", False)) and as_int(repeat_summary["written"]) == 0 + + if args.json: + payload: dict[str, Any] = {"ok": ok, "preflight": preflight, "write": write} + if repeat is not None: + payload["idempotency_check"] = repeat + emit(payload, pretty=True) + else: + emit({"ok": ok, "runs": summaries}, pretty=True) + return 0 if ok else 1 + + +if __name__ == "__main__": + raise SystemExit(main())