Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ contextlattice_checkpoint -h
./scripts/agent/memory-edge-backfill
./scripts/agent/memory-edge-backfill --include-inferred --min-confidence 0.90
./scripts/agent/memory-edge-backfill --write
./scripts/agent/memory-edge-inferred-retrofill --all-projects
./scripts/agent/memory-edge-inferred-retrofill --all-projects --write --confirm-retrofill ALL_PROJECTS
```

## Security and Privacy
Expand Down
9 changes: 9 additions & 0 deletions archive/internal-planning/engine-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ Key optional fields:
- `inferred_min_shared_terms` (default `3`): minimum lexical overlap before scoring.
- `inferred_max_token_postings` (default `64`): skips overly-common terms to bound fanout.

Operator-safe retrofill wrapper:

```bash
./scripts/agent/memory-edge-inferred-retrofill --project context-lattice-private
./scripts/agent/memory-edge-inferred-retrofill --project context-lattice-private --write --confirm-retrofill context-lattice-private
```

The wrapper restricts the request to `inferred_related`, runs a dry-run preflight before any write, refuses truncated preflight results unless `--allow-truncated` is set, and repeats write mode once to verify idempotency.

## Runtime Flags

- `USE_RUST_CODEC`
Expand Down
190 changes: 190 additions & 0 deletions scripts/agent/memory-edge-inferred-retrofill
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""Guarded maintenance job for retroactive inferred memory edges."""

from __future__ import annotations

import argparse
import json
import sys
from typing import Any

from _common import emit, request_json


def relation_stats(body: dict[str, Any], relation: str) -> dict[str, Any]:
relations = body.get("relations")
if not isinstance(relations, dict):
return {}
stat = relations.get(relation)
return stat if isinstance(stat, dict) else {}


def as_int(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0


def build_payload(args: argparse.Namespace, dry_run: bool) -> dict[str, Any]:
relation = args.inferred_relation
payload: dict[str, Any] = {
"dry_run": dry_run,
"relations": [relation],
"include_inferred": True,
"include_low_confidence_audit": False,
"min_confidence": args.min_confidence,
"max_candidates": args.max_candidates,
"max_history_lines": args.max_history_lines,
"sample_limit": args.sample_limit,
"inferred_relation": relation,
"inferred_peer_limit": args.inferred_peer_limit,
"inferred_scan_limit": args.inferred_scan_limit,
"inferred_min_score": args.inferred_min_score,
"inferred_min_shared_terms": args.inferred_min_shared_terms,
"inferred_max_token_postings": args.inferred_max_token_postings,
}
if args.project:
payload["project"] = args.project
if args.include_cold is not None:
payload["include_cold"] = args.include_cold
if args.include_ephemeral:
payload["include_ephemeral"] = True
if args.include_test_memory:
payload["include_test_memory"] = True
return payload


def summarize(stage: str, body: dict[str, Any], relation: str) -> dict[str, Any]:
stat = relation_stats(body, relation)
return {
"stage": stage,
"ok": bool(body.get("ok", False)),
"dry_run": bool(body.get("dry_run", False)),
"project": body.get("project", ""),
"relation": relation,
"scanned_docs": as_int(body.get("scanned_docs")),
"generated": as_int(stat.get("generated", body.get("generated"))),
"eligible": as_int(stat.get("eligible", body.get("eligible"))),
"would_write": as_int(body.get("would_write")),
"written": as_int(stat.get("written", body.get("written"))),
"existing": as_int(stat.get("existing", body.get("existing"))),
"truncated": bool(body.get("truncated", False)),
"min_confidence": body.get("min_confidence"),
"inferred_min_score": body.get("inferred_min_score"),
"inferred_peer_limit": body.get("inferred_peer_limit"),
"inferred_scan_limit": body.get("inferred_scan_limit"),
"inferred_max_token_postings": body.get("inferred_max_token_postings"),
"errors": body.get("errors") if isinstance(body.get("errors"), list) else [],
}


def require_scope(args: argparse.Namespace) -> None:
if args.project or args.all_projects:
return
raise SystemExit(
json.dumps(
{
"ok": False,
"error": "project is required unless --all-projects is set",
},
sort_keys=True,
)
)


def confirm_token(args: argparse.Namespace) -> str:
return "ALL_PROJECTS" if args.all_projects else args.project


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
scope = parser.add_mutually_exclusive_group()
scope.add_argument("--project", default="", help="Project to retrofill.")
scope.add_argument("--all-projects", action="store_true", help="Allow all projects; writes require --confirm-retrofill ALL_PROJECTS.")
parser.add_argument("--write", action="store_true", help="Persist after a clean dry-run preflight.")
parser.add_argument(
"--confirm-retrofill",
default="",
help="Required with --write. Must equal the project name, or ALL_PROJECTS with --all-projects.",
)
parser.add_argument("--allow-truncated", action="store_true", help="Allow writes after a truncated preflight.")
parser.add_argument("--skip-idempotency-check", action="store_true", help="Do not repeat write mode to verify zero new writes.")
parser.add_argument("--include-cold", dest="include_cold", action="store_true", default=None)
parser.add_argument("--exclude-cold", dest="include_cold", action="store_false")
parser.add_argument("--include-ephemeral", action="store_true")
parser.add_argument("--include-test-memory", action="store_true")
parser.add_argument("--min-confidence", type=float, default=0.90)
parser.add_argument("--max-candidates", type=int, default=50000)
parser.add_argument("--max-history-lines", type=int, default=1)
parser.add_argument("--sample-limit", type=int, default=20)
parser.add_argument("--inferred-relation", default="inferred_related")
parser.add_argument("--inferred-peer-limit", type=int, default=1)
parser.add_argument("--inferred-scan-limit", type=int, default=5000)
parser.add_argument("--inferred-min-score", type=float, default=0.90)
parser.add_argument("--inferred-min-shared-terms", type=int, default=3)
parser.add_argument("--inferred-max-token-postings", type=int, default=64)
parser.add_argument("--timeout", type=float, default=180)
parser.add_argument("--json", action="store_true", help="Emit full response payloads instead of summaries.")
return parser.parse_args()


def main() -> int:
args = parse_args()
require_scope(args)
relation = args.inferred_relation

preflight = request_json("POST", "/v1/memory/edges/backfill", build_payload(args, True), args.timeout)
summaries = [summarize("dry_run_preflight", preflight, relation)]
if not preflight.get("ok", False):
if args.json:
emit({"ok": False, "preflight": preflight}, pretty=True)
else:
emit({"ok": False, "runs": summaries}, pretty=True)
return 1
if preflight.get("truncated") and not args.allow_truncated:
if args.json:
emit({"ok": False, "preflight": preflight}, pretty=True)
else:
emit({"ok": False, "runs": summaries}, pretty=True)
return 2
if not args.write:
if args.json:
emit({"ok": True, "preflight": preflight}, pretty=True)
else:
emit({"ok": True, "runs": summaries}, pretty=True)
return 0
if args.confirm_retrofill != confirm_token(args):
summaries.append(
{
"stage": "write_blocked",
"ok": False,
"error": "confirm-retrofill token mismatch",
"expected": confirm_token(args),
}
)
emit({"ok": False, "runs": summaries}, pretty=True)
return 2

write = request_json("POST", "/v1/memory/edges/backfill", build_payload(args, False), args.timeout)
summaries.append(summarize("write", write, relation))
ok = bool(write.get("ok", False))
repeat: dict[str, Any] | None = None
if not args.skip_idempotency_check and ok:
repeat = request_json("POST", "/v1/memory/edges/backfill", build_payload(args, False), args.timeout)
repeat_summary = summarize("idempotency_check", repeat, relation)
summaries.append(repeat_summary)
ok = bool(repeat.get("ok", False)) and as_int(repeat_summary["written"]) == 0

if args.json:
payload: dict[str, Any] = {"ok": ok, "preflight": preflight, "write": write}
if repeat is not None:
payload["idempotency_check"] = repeat
emit(payload, pretty=True)
else:
emit({"ok": ok, "runs": summaries}, pretty=True)
return 0 if ok else 1


if __name__ == "__main__":
raise SystemExit(main())
Loading