diff --git a/.gitignore b/.gitignore index 74a1f6d..1c40836 100644 --- a/.gitignore +++ b/.gitignore @@ -36,4 +36,11 @@ npm-debug.log* public/manifest.json # Claude Code -.claude/*.local.json \ No newline at end of file +.claude/*.local.json +.claude/scheduled_tasks.lock +.claude/worktrees/ + +# GEPA / prompt optimization +.venv-gepa/ +prompts/analyze.optimized*.md +gepa-runs/ \ No newline at end of file diff --git a/prompt_optim/COMMANDS.md b/prompt_optim/COMMANDS.md new file mode 100644 index 0000000..e7a2870 --- /dev/null +++ b/prompt_optim/COMMANDS.md @@ -0,0 +1,196 @@ +# prompt_optim — sample commands + +Ordered shortest-to-longest. All paths are relative — **run from the worktree root**. + +## Setup notes + +- Worktree's `.seam/` is a copy of the main repo's `.seam/` (gitignored). + State changes (telemetry, generic-speakers tweaks) live in this copy only. +- `.venv-gepa/` holds the GEPA install. Created via: + ```bash + python3 -m venv .venv-gepa + .venv-gepa/bin/pip install -r prompt_optim/requirements.txt + ``` +- The metric and adapter pick up `.seam/` automatically. Override with + `SEAM_DATA_DIR=/path/to/.seam` if you want to point at a different copy. + +--- + +## 1) Pure-Python tests (no LLM calls; instant) + +### Score one analysis + +```bash +.venv-gepa/bin/python3 -m prompt_optim.metric \ + .seam/recordings/2026-02-24_cancun-and-project-management \ + .seam/analysis/2026-02-24_cancun-and-project-management/analysis.json +``` + +### Distribution of metric scores across the whole dataset + +Useful for spotting the worst-scoring analyses (where the prompt is failing +hardest) before you invest LLM calls. + +```bash +.venv-gepa/bin/python3 -c ' +import json +from pathlib import Path +from prompt_optim.metric import score_analysis, _load_people_names +people = _load_people_names() +scores = [] +for rd in sorted(Path(".seam/recordings").iterdir()): + aj = Path(".seam/analysis") / rd.name / "analysis.json" + if not (rd / "recording.json").exists() or not aj.exists(): continue + try: + rec = json.loads((rd / "recording.json").read_text()) + an = json.loads(aj.read_text()) + except: continue + scores.append((score_analysis(an, rec, people).total, rd.name)) +scores.sort() +import statistics +just = [s for s,_ in scores] +print(f"n={len(scores)} min={min(just):.3f} p25={statistics.quantiles(just,n=4)[0]:.3f} median={statistics.median(just):.3f} p75={statistics.quantiles(just,n=4)[2]:.3f} max={max(just):.3f}") +print("\nworst 5:"); [print(f" {s:.3f} {n}") for s,n in scores[:5]] +print("\nbest 5:"); [print(f" {s:.3f} {n}") for s,n in scores[-5:]] +' +``` + +--- + +## 2) Wrapper smoke (1 LM call; ~20s, uses haiku quota) + +Confirms `claude` CLI subprocess invocation, JSON envelope parsing, and +Max-subscription auth. + +```bash +.venv-gepa/bin/python3 -m prompt_optim.claude_cli_lm +``` + +Expect: a response containing the literal word `OK`. + +--- + +## 3) Adapter smoke (1 sonnet call; ~1-2 min) + +Single-example end-to-end run of the adapter without the GEPA loop. + +```bash +.venv-gepa/bin/python3 -m prompt_optim._smoke_adapter +``` + +Expect: a score in the 0.7–0.9 range with subscores per component. + +--- + +## 4) Tiny GEPA loop (haiku, ~3-5 min, ~10-15 calls) + +Confirms the whole pipeline including reflection + mutation + acceptance +test. Won't produce a meaningfully better prompt at this budget — it's +wiring confirmation. + +```bash +.venv-gepa/bin/python3 -m prompt_optim.optimize \ + --budget 6 --task-model haiku --reflection-model haiku +``` + +Output: `prompts/analyze.optimized.md` (likely identical to seed at this +budget — meaning no mutation passed the acceptance test, which is expected). + +--- + +## 5) Production-strength run (sonnet+opus, hours-to-days) + +Plan for 1–3 days wall clock against Max caps. The state file at +`.seam/prompt-optim-state.json` persists telemetry across the run, and the +rate-limit retry loop in `claude_cli_lm.py` makes long sleeps recoverable — +but expect the run to spread across multiple sessions if you have a tight +weekly cap. + +```bash +.venv-gepa/bin/python3 -m prompt_optim.optimize \ + --budget 150 \ + --task-model sonnet \ + --reflection-model opus \ + --consistency \ + --train 2026-02-23_team-sync-update \ + 2026-03-05_thyroid-medication-adjustment \ + 2026-03-17_vision-and-execution-strategy \ + 2026-02-24_cancun-and-project-management +``` + +Watch telemetry while the run progresses (separate terminal): + +```bash +watch -n 30 'cat .seam/prompt-optim-state.json' +``` + +After the run finishes, **do not** auto-promote +`prompts/analyze.optimized.md` over `prompts/analyze.md`. Re-test in +production tool-writing mode first — see the "Reviewing & promoting" section +in [README.md](README.md). + +--- + +## Helpers + +### Inspect telemetry + +```bash +cat .seam/prompt-optim-state.json | python3 -m json.tool +``` + +### Reset telemetry + +Zeroes the counters but leaves recordings/analyses untouched. + +```bash +rm .seam/prompt-optim-state.json +``` + +### Diff seed vs. optimized prompt + +```bash +diff prompts/analyze.md prompts/analyze.optimized.md +``` + +```bash +code --diff prompts/analyze.md prompts/analyze.optimized.md +``` + +### Run against a different `.seam/` + +```bash +SEAM_DATA_DIR=/Users/cwoodson/src/personal/seam/.seam \ + .venv-gepa/bin/python3 -m prompt_optim.optimize --budget 6 ... +``` + +### Force a specific recording set as trainset + +The `--train` flag accepts directory names from `.seam/recordings/`: + +```bash +.venv-gepa/bin/python3 -m prompt_optim.optimize \ + --budget 20 \ + --train 2026-02-23_team-sync-update 2026-03-17_vision-and-execution-strategy +``` + +--- + +## Reading the output + +`optimize.py` prints at the end: + +``` +[optimize] seed score: 0.7302 -> best score: 0.8154 (12 candidates evaluated) +[optimize] telemetry: 87 calls, rate-limit hits: 2, input tok: 124, output tok: 91,234, cost proxy: $4.73 +[optimize] sonnet: 64 calls, in=82 out=78,109 cost_proxy=$2.91 +[optimize] opus: 23 calls, in=42 out=13,125 cost_proxy=$1.82 +``` + +- **Seed → best score** tells you whether GEPA actually improved the prompt. +- **`rate_limit_hits`** is how many times the wrapper saw a usage-cap error + and slept; > 0 just means you hit the cap during the run, not that + anything failed. +- **`cost_proxy`** is **not a real charge** under Max-subscription auth — + the CLI surfaces the API-equivalent cost number, but no billing event + occurred. Treat it as a relative indicator of subscription burn. diff --git a/prompt_optim/README.md b/prompt_optim/README.md new file mode 100644 index 0000000..f764eeb --- /dev/null +++ b/prompt_optim/README.md @@ -0,0 +1,175 @@ +# prompt_optim — GEPA-based optimization for `prompts/analyze.md` + +Optimizes the recording-analysis prompt using [GEPA](https://github.com/gepa-ai/gepa) +without paying Anthropic API tokens. All model calls go through the `claude` +CLI, which uses your Max subscription's OAuth keychain auth. + +## Why this isn't billed against the API + +The `claude` CLI authenticates via the same OAuth/keychain login you use for +interactive sessions. Calls made via `claude -p` consume your Max +subscription's quota — not API credits. The wrapper at +[`claude_cli_lm.py`](claude_cli_lm.py) shells out to `claude -p +--output-format json` and unsets `ANTHROPIC_API_KEY` defensively to keep +billing on the subscription. + +## Files + +| File | Purpose | +| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `claude_cli_lm.py` | Subprocess wrapper around `claude -p`. Serial semaphore (Max is one auth bucket), exponential backoff on usage-limit errors, persistent telemetry at `.seam/prompt-optim-state.json` (calls, tokens, cost-proxy, by-model). | +| `metric.py` | Reference-free deterministic metric. See [Metric components](#metric-components) below. | +| `adapter.py` | Custom `GEPAAdapter`. Wraps the candidate prompt in the same delimiters used by `scripts/pocket-run.sh`, asks Claude to emit raw JSON to stdout (vs. the production prompt which writes via the Write tool), parses the response, scores it, and assembles reflective trajectories. Optionally re-runs the first batch example a second time to score consistency. | +| `optimize.py` | CLI driver. `python -m prompt_optim.optimize --budget 20`. | +| `_smoke_adapter.py` | Single-example end-to-end smoke test of the adapter without the GEPA loop. | + +## Metric components + +All components are deterministic, derived from `recording.json`, +`people.json`, and `.seam/generic-speakers.txt` (if present). No +gold-standard analyses, no LLM-as-judge. + +### Hard gate + +| Component | Behavior | +| --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `schema` | Output JSON must conform to the schema in `prompts/analyze.md`. If it fails, the metric short-circuits to 0 — none of the others matter when the structure is broken. | + +### Quality components (weighted sum if the gate passes) + +| Component | Weight | What it measures | +| ------------------------- | ------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `speaker_grounding` | 0.15 | Every name in `speaker_map.values()` is in `people.json` and not generic (per `scripts/seed_people.py:GENERIC_LABELS` + `.seam/generic-speakers.txt`). | +| `participant_consistency` | 0.05 | `participants[]` ⊆ `people.json` ∪ `speaker_map.values()`, no generics. Catches invented participants. | +| `attribution_grounding` | 0.15 | Aggregate grounding across `speaker_map`, `decisions[].by`, `key_quotes[].speaker`, `action_items[].owner`. | +| `quote_grounding` | 0.15 | Each `key_quotes[].text` must appear as a substring (whitespace-fuzzed) in the transcript. The single biggest failure mode of the current prompt — average across the dataset is ~0.29. | +| `coverage_spread` | 0.05 | Quote timestamps should hit all 3 thirds of the recording. | +| `takeaway_quality` | 0.20 | Average of two sub-checks: pairwise Jaccard < 0.6 between takeaway content-word sets (non-redundancy) + each takeaway shares ≥ 2 content words with the transcript (grounding). Mostly a regression guard today — the current prompt is good here. | +| `mind_map_quality` | 0.10 | Average of: edge-endpoint integrity, single connected component, root branching factor in 3..7, ≥ 2 distinct node types. | +| `output_economy` | 0.10 | Sigmoid penalty on `len(json.dumps(analysis))`. Calibrated against the existing 178-analysis distribution: 14 kB (current median) → 0.5, 11 kB → 0.73, 16 kB → 0.31. | +| `consistency` | 0.05 | **Gated on `--consistency`.** Re-runs the first batch example a second time and scores Jaccard over takeaway content-words and `speaker_map` value sets. When disabled, its 0.05 is redistributed pro-rata across the others. | + +### Why this set + +- **No LLM-as-judge** in v1: doubles subscription burn; judge and task share blind spots. +- **No reference-similarity to existing analyses**: those were produced by the prompt we're optimizing, so they're not gold standard. Optimizing toward them caps the result at "looks like the current output." +- **Reuses `is_generic` from `scripts/seed_people.py`** so the metric and the production speaker-staging pipeline agree on what counts as a generic role label. Tweak `.seam/generic-speakers.txt` once and both pick it up. + +## Quick start + +One-time setup (creates a venv outside the project tree pollutes nothing): + +```bash +python3 -m venv .venv-gepa +.venv-gepa/bin/pip install -r prompt_optim/requirements.txt +``` + +Tiny smoke test of the CLI wrapper: + +```bash +.venv-gepa/bin/python3 -m prompt_optim.claude_cli_lm +# expect: response containing "OK" +``` + +Smoke test the adapter (one Sonnet call, ~1–2 min): + +```bash +SEAM_DATA_DIR=/path/to/.seam .venv-gepa/bin/python3 -m prompt_optim._smoke_adapter +# expect: score ~0.9 with subscores per component +``` + +Run a tiny optimization (budget=20, ~30–90 min on Max sub): + +```bash +SEAM_DATA_DIR=/path/to/.seam .venv-gepa/bin/python3 -m prompt_optim.optimize --budget 20 +# writes prompts/analyze.optimized.md +``` + +## Production-quality run + +Once you've confirmed the wiring works: + +```bash +SEAM_DATA_DIR=/path/to/.seam .venv-gepa/bin/python3 -m prompt_optim.optimize \ + --budget 150 \ + --task-model sonnet \ + --reflection-model opus \ + --consistency \ + --train 2026-02-23_team-sync-update \ + 2026-03-05_thyroid-medication-adjustment \ + 2026-03-17_vision-and-execution-strategy \ + 2026-02-24_cancun-and-project-management \ + ... +``` + +`--consistency` is **off by default** because it adds one extra LM call per +`evaluate()`. Enable it on production-strength runs (`--budget >= 100`) where +the extra cost is justified. + +### Telemetry + +Every successful CLI call appends to `.seam/prompt-optim-state.json`: + +```json +{ + "calls": 13, + "rate_limit_hits": 0, + "total_cost_usd_proxy": 0.27, + "total_input_tokens": 9, + "total_output_tokens": 7788, + "total_cache_read_input_tokens": 96221, + "by_model": { "sonnet": { ... }, "opus": { ... } } +} +``` + +`total_cost_usd_proxy` is **not a real charge** under Max-subscription auth — there's +no API billing event. Treat it as a relative indicator of subscription burn. +`optimize.py` prints a summary at end of run. + +Plan for **1–3 days wall clock** against Max caps. The wrapper persists state +at `.seam/prompt-optim-state.json` so a multi-day run can be resumed by +re-invoking the same command — GEPA itself starts fresh, but the rate-limit +backoff state and call counters are preserved. + +## Reviewing & promoting an optimized prompt + +`prompts/analyze.optimized.md` is **never auto-promoted**. After a run: + +1. Eyeball the diff: `diff prompts/analyze.md prompts/analyze.optimized.md`. +2. **Re-test in production tool-writing mode**, not just optimization JSON + mode. The optimized prompt was scored against direct-JSON output; the + production path uses `--allowedTools "Write"`. Manually edit + [`scripts/pocket-run.sh`](../scripts/pocket-run.sh) to reference + `analyze.optimized.md` for one run, regenerate analyses for 3 recordings, + and diff the resulting `analysis.json` files against current ones. Look + for: more grounded `speaker_map` entries, no hallucinated names, + `key_quotes` that match transcript text. +3. Only after that passes manual review, copy `analyze.optimized.md` over + `analyze.md` and open a PR. + +## Design notes + +- **Metric is reference-free**: the 178 existing analyses in `.seam/analysis/` + were produced by the prompt we're optimizing, so they're not a clean gold + standard. Instead the metric scores against the source recording + (transcript substrings, speaker names in `people.json`) and the schema. +- **Serial execution**: Max is one auth bucket. Parallel calls don't increase + throughput, just burn the bucket faster. The wrapper enforces serial calls + via a global lock. +- **Rate-limit retry**: when the CLI returns a usage-limit error, the wrapper + sleeps with exponential backoff (cap 1h) and retries up to `max_retries` + times. Counters persist to the state file. +- **Optimization-time prompt drift**: at optimization time the wrapped prompt + emits raw JSON; at production time it uses the Write tool. Functionally + equivalent for what the metric scores, but the optimized prompt **must** be + re-tested in production mode before promotion (see step 2 above). +- **No LLM-as-judge**: would double Max-quota spend and create a + feedback loop where judge and task share blind spots. +- **`is_generic` is sourced from `scripts/seed_people.py`**: the metric and + the production speaker-staging pipeline share one definition. To extend + the rejection list, edit `.seam/generic-speakers.txt` (one label per line) + and both pick it up automatically. +- **Output-economy sigmoid is calibrated to the existing dataset**: median + current output is ~14 kB, so the curve is centered at 14 kB. Optimization + has real gradient toward smaller outputs without immediately killing the + seed prompt's score. diff --git a/prompt_optim/TRAINSET.md b/prompt_optim/TRAINSET.md new file mode 100644 index 0000000..c5a2f84 --- /dev/null +++ b/prompt_optim/TRAINSET.md @@ -0,0 +1,154 @@ +# Picking trainset inputs for GEPA optimization + +Pick recordings that **expose the prompt's weaknesses**, not its strengths. +GEPA optimizes whatever the metric flags — if your trainset is recordings +the seed already handles well, GEPA has no signal to chase. Conversely, +picking only the worst recordings can produce a prompt over-fit to +pathological cases. + +The right framing: you're assembling a **failure-mode portfolio**, where +each recording is chosen to stress one of the metric's components. + +## Criteria, ordered by importance + +### 1. Score below the median on the current prompt + +If `score_analysis` returns 0.85 on a recording, GEPA can't push it much +higher — there's no gradient. The dataset-distribution command in +[COMMANDS.md](COMMANDS.md) shows the median; pick from below it. + +### 2. Cover the dominant failure modes + +Look at which subscores are dragging totals down. From the dataset today: + +| Component | Mean across dataset | Notes | +| ------------------------- | ------------------- | ----------------------------------------------------------------- | +| `quote_grounding` | ~0.29 | Verbatim-quote substring failure. Dominant. | +| `attribution_grounding` | ~0.68 | Owners/by/speaker fields with role descriptions instead of names. | +| `output_economy` | varies | Bloat above ~14 kB. | +| `participant_consistency` | varies | Invented or generic participants. | + +A trainset where every recording fails on quote_grounding will only +optimize quotes. A mix of failure types forces GEPA to find improvements +that **generalize**. + +### 3. Diverse speaker setups + +Where the prompt's hardest work happens: + +- One **all-Unknown** transcript (model must infer everyone from content). +- One with **labeled speakers** but no names (`SPEAKER_00`, `SPEAKER_01`). +- One with **mixed** Unknown + named. +- One with **correct labels already** — regression guard so the prompt + doesn't break the easy case. + +### 4. Diverse recording types + +The schema's `type` field is `meeting | brainstorm | interview | lecture | +conversation | other`. Behavior on a 60-min meeting differs from a 2-min +voice memo. Sample at least 2–3 distinct types. + +### 5. Length spread + +Include one short (<5 min), one medium (15–30 min), one long (60+ min). +Long recordings stress `output_economy`; short recordings stress whether +the model handles thin content gracefully. + +### 6. Avoid duplicates and near-duplicates + +The dataset has many `pending-` entries that mirror named +recordings. Skip those — they don't add information, just multiply cost. + +## What to avoid + +- **All recordings from the same context.** If your whole trainset is + internal team standups, you'll optimize a "standup" prompt, not a + general analysis prompt. +- **Recordings where the metric scores 0.0** (schema gate failure). These + tell GEPA "produce _anything_ valid"; once schema is satisfied, the + gradient evaporates. You want recordings the seed handles structurally + but suboptimally. +- **Recordings where you're not sure of ground truth.** If you don't know + who's actually speaking, you can't sanity-check the optimized prompt by + eye after the run. +- **Too many recordings.** Each one multiplies cost per metric call. With + `--budget 150` and 8 recordings, GEPA only evaluates ~18 candidates. + 4 recordings × 37 candidates beats 8 × 18 — prompt-space exploration + matters more than dataset breadth at this scale. + +## Picker script + +Run [`_pick_trainset.py`](_pick_trainset.py) — it prints the worst-scoring +real recordings (non-pending, non-trivial), annotated with type, duration, +speaker setup, and dominant failure mode, plus a directly-pasteable +`--train` line at the bottom. + +```bash +.venv-gepa/bin/python3 -m prompt_optim._pick_trainset +``` + +Useful flags: + +```bash +.venv-gepa/bin/python3 -m prompt_optim._pick_trainset --top 50 +.venv-gepa/bin/python3 -m prompt_optim._pick_trainset --include-pending +.venv-gepa/bin/python3 -m prompt_optim._pick_trainset --min-segments 20 +SEAM_DATA_DIR=/path/to/.seam .venv-gepa/bin/python3 -m prompt_optim._pick_trainset +``` + +Sample output (top 8 from the current dataset): + +``` +score type min speakers dominant_failure name +0.445 meeting 31 labeled_only speaker_grounding=0.00 2026-03-17_ai-agents-for-workplace-efficiency +0.474 meeting 122 labeled_only speaker_grounding=0.00 2026-04-14_agentic-workflow-planning-and-pr-automation +0.541 meeting 47 labeled_only output_economy=0.06 2026-03-09_project-updates-and-travel-plans +0.577 meeting 30 labeled_only quote_grounding=0.20 2026-02-20_project-updates-and-tooling-integration +0.579 meeting 80 named quote_grounding=0.00 2026-03-26_ai-driven-insurance-analytics-onboarding +0.581 meeting 23 labeled_only participant_consistency=0.17 2026-03-17_trade-show-retrospective +0.583 meeting 95 labeled_only attribution_grounding=0.22 2026-03-17_company-vision-and-strategy-session +0.591 conversation 21 labeled_only attribution_grounding=0.11 2026-03-06_calebs-pediatrician-appointment +``` + +## How to read the output and pick 4 recordings + +Scan the table top-down (worst scores first) and assemble a portfolio that +checks these boxes: + +1. **At least 3 different `dominant_failure` values across your 4 picks.** + Don't take 4 recordings all failing on `quote_grounding`. +2. **At least 3 different `speakers` categories.** Include the `unknown_only` + case (hardest) and ideally one `mixed` for the regression guard. +3. **At least 2 different `type` values.** Don't take 4 meetings. +4. **A length spread.** At least one < 10 min, at least one > 30 min. +5. **All scores between ~0.5 and ~0.78** — meaningfully below median, but + not schema-broken. + +Then paste the four directory names into `optimize.py`: + +```bash +.venv-gepa/bin/python3 -m prompt_optim.optimize \ + --budget 150 \ + --task-model sonnet \ + --reflection-model opus \ + --consistency \ + --train +``` + +## When to revisit + +- **After your first production-strength run finishes:** re-run the picker. + The optimized prompt's dataset-wide score distribution will look + different. Recordings that _now_ score worst are the new failure modes + to target in a second optimization pass. +- **When you change the metric:** if you adjust weights or add a component, + your dominant-failure column changes — re-pick. +- **When you add new recordings:** if 50 new recordings come in via Pocket, + some will likely surface failure modes the current trainset doesn't + cover. + +## Notes for later + +- `optimize.py` currently passes `valset=trainset` (same set for train and + val). For a real production-strength run we'll want to hold out 2–3 + recordings as a true valset. Tracked separately. diff --git a/prompt_optim/__init__.py b/prompt_optim/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prompt_optim/_pick_trainset.py b/prompt_optim/_pick_trainset.py new file mode 100644 index 0000000..7569ff6 --- /dev/null +++ b/prompt_optim/_pick_trainset.py @@ -0,0 +1,147 @@ +"""Print the worst-scoring real recordings as a trainset shortlist. + +Each row is annotated with type, duration, speaker setup, and dominant +failure mode. Use the table to hand-pick 4 recordings spanning multiple +failure modes / speaker setups / types — see TRAINSET.md. + +Usage (from the worktree root): + + .venv-gepa/bin/python3 -m prompt_optim._pick_trainset + .venv-gepa/bin/python3 -m prompt_optim._pick_trainset --top 50 + SEAM_DATA_DIR=/path/to/.seam .venv-gepa/bin/python3 -m prompt_optim._pick_trainset +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path + +from prompt_optim.metric import _load_people_names, score_analysis + + +def _seam_dir() -> Path: + env = os.environ.get("SEAM_DATA_DIR") + if env: + return Path(env).expanduser().resolve() + # Default: worktree root's .seam (parent.parent because this file lives + # in prompt_optim/). + return Path(__file__).resolve().parent.parent / ".seam" + + +def _classify_speakers(transcript: list[dict]) -> str: + speakers = {seg.get("speaker") for seg in transcript if isinstance(seg, dict)} + speakers.discard(None) + if not speakers: + return "empty" + if speakers <= {"Unknown"}: + return "unknown_only" + if all(isinstance(s, str) and s.startswith("SPEAKER_") for s in speakers): + return "labeled_only" + if "Unknown" in speakers and any( + isinstance(s, str) and s.startswith("SPEAKER_") for s in speakers + ): + return "mixed" + return "named" + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--top", type=int, default=25, help="how many rows to print") + ap.add_argument( + "--min-segments", + type=int, + default=8, + help="skip recordings with fewer transcript segments than this", + ) + ap.add_argument( + "--include-pending", + action="store_true", + help="include 'pending-' duplicate-style recordings", + ) + args = ap.parse_args() + + seam = _seam_dir() + rec_dir = seam / "recordings" + an_dir = seam / "analysis" + if not rec_dir.exists(): + print(f"ERROR: no recordings dir at {rec_dir}", file=sys.stderr) + return 1 + + people = _load_people_names() + + rows: list[dict] = [] + for rd in sorted(rec_dir.iterdir()): + if not rd.is_dir(): + continue + rj = rd / "recording.json" + aj = an_dir / rd.name / "analysis.json" + if not rj.exists() or not aj.exists(): + continue + if "pending-" in rd.name and not args.include_pending: + continue + try: + rec = json.loads(rj.read_text()) + an = json.loads(aj.read_text()) + except (OSError, json.JSONDecodeError): + continue + transcript = rec.get("transcript") or [] + if len(transcript) < args.min_segments: + continue + + result = score_analysis(an, rec, people) + if result.total == 0: + continue # schema-failed; no gradient + + speaker_kind = _classify_speakers(transcript) + dominant_name, dominant_score = min( + result.subscores.items(), key=lambda kv: kv[1] + ) + rows.append( + { + "name": rd.name, + "score": result.total, + "type": an.get("type") or "-", + "duration_min": int((rec.get("duration") or 0) / 60), + "speakers": speaker_kind, + "dominant_failure": f"{dominant_name}={dominant_score:.2f}", + } + ) + + if not rows: + print("No qualifying recordings found.", file=sys.stderr) + return 1 + + rows.sort(key=lambda r: r["score"]) + + header = ( + f"{'score':>5} {'type':<12} {'min':>4} " + f"{'speakers':<13} {'dominant_failure':<32} name" + ) + print(header) + print("-" * len(header)) + for r in rows[: args.top]: + print( + f"{r['score']:.3f} {r['type']:<12} {r['duration_min']:>4} " + f"{r['speakers']:<13} {r['dominant_failure']:<32} {r['name']}" + ) + + print() + print(f"({len(rows)} qualifying recordings; showing worst {min(args.top, len(rows))})") + print( + "\nNext: hand-pick 4 names from this table covering multiple " + "dominant_failure types, speaker setups, and recording types. " + "See TRAINSET.md for the picking criteria. Then:" + ) + print( + "\n .venv-gepa/bin/python3 -m prompt_optim.optimize \\\n" + " --budget 150 --task-model sonnet --reflection-model opus --consistency \\\n" + " --train " + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/prompt_optim/_smoke_adapter.py b/prompt_optim/_smoke_adapter.py new file mode 100644 index 0000000..7a013f8 --- /dev/null +++ b/prompt_optim/_smoke_adapter.py @@ -0,0 +1,69 @@ +"""End-to-end smoke test for SeamAnalyzeAdapter on a single recording. + +Run from the repo root with the venv: + + SEAM_DATA_DIR=/path/to/.seam .venv-gepa/bin/python3 -m prompt_optim._smoke_adapter + +It runs ONE evaluate() call against the seed prompt + 1 trainset example, +prints the score breakdown, and exits non-zero on hard failures. +""" + +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path + +from prompt_optim.adapter import COMPONENT_NAME, SeamAnalyzeAdapter +from prompt_optim.claude_cli_lm import ClaudeCliLM + + +def main() -> int: + repo_root = Path(__file__).resolve().parent.parent + seam_dir = Path(os.environ.get("SEAM_DATA_DIR") or repo_root / ".seam") + prompt_path = repo_root / "prompts" / "analyze.md" + rec_dir = seam_dir / "recordings" / "2026-02-23_team-sync-update" + + if not prompt_path.exists(): + print(f"ERROR: missing seed prompt at {prompt_path}", file=sys.stderr) + return 1 + if not rec_dir.exists(): + print( + f"ERROR: missing recording dir at {rec_dir} " + f"(set SEAM_DATA_DIR to a .seam containing this recording)", + file=sys.stderr, + ) + return 1 + + seed_prompt = prompt_path.read_text() + recording = json.loads((rec_dir / "recording.json").read_text()) + + lm = ClaudeCliLM(model="sonnet", timeout=300, max_retries=2, verbose=True) + adapter = SeamAnalyzeAdapter(lm=lm, people_json_path=seam_dir / "people.json") + + batch = [{"recording_dir": str(rec_dir), "recording": recording}] + candidate = {COMPONENT_NAME: seed_prompt} + + print(f"[smoke] Evaluating seed prompt on {rec_dir.name} ...") + eb = adapter.evaluate(batch, candidate, capture_traces=True) + + print(f"[smoke] score: {eb.scores[0]:.3f}") + if eb.trajectories: + traj = eb.trajectories[0] + print(f"[smoke] subscores: {traj['subscores']}") + if traj["issues"]: + print("[smoke] issues:") + for i in traj["issues"]: + print(f" - {i}") + # Confirm the reflective dataset assembles cleanly. + rd = adapter.make_reflective_dataset(candidate, eb, [COMPONENT_NAME]) + items = rd[COMPONENT_NAME] + print(f"[smoke] reflective dataset has {len(items)} item(s)") + + print("[smoke] OK") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/prompt_optim/adapter.py b/prompt_optim/adapter.py new file mode 100644 index 0000000..3011d50 --- /dev/null +++ b/prompt_optim/adapter.py @@ -0,0 +1,247 @@ +"""Custom GEPAAdapter for the Seam analyze prompt. + +The component under optimization is a single string, ``analyze_prompt``, +corresponding to the contents of ``prompts/analyze.md``. + +DataInst shape (a plain dict — no TypedDict required, GEPA treats DataInst as +opaque): + { + "recording_dir": str, # absolute path to a .seam/recordings/ + "recording": dict, # parsed recording.json contents + } + +The adapter wraps the candidate prompt with the same delimiters used in +``scripts/pocket-run.sh`` (recording data + people data) and adds an +optimization-time instruction telling Claude to emit raw JSON to stdout +instead of writing files (the production prompt uses the Write tool — that +mode is harder to score deterministically). The optimized prompt should be +tested in the production tool-writing path before promotion. +""" + +from __future__ import annotations + +import json +import re +import traceback +from collections.abc import Mapping, Sequence +from pathlib import Path +from typing import Any + +from gepa.core.adapter import EvaluationBatch, GEPAAdapter + +from prompt_optim.claude_cli_lm import ClaudeCliLM +from prompt_optim.metric import ( + MetricResult, + consistency_score, + parse_analysis_from_response, + score_analysis, +) + + +COMPONENT_NAME = "analyze_prompt" + + +def _build_user_message( + recording_data_json: str, + people_data_json: str, +) -> str: + return ( + "You are analyzing a Pocket AI recording. " + "Output ONLY the analysis.json contents as a single raw JSON object. " + "Do not write files, do not include any prose, do not wrap in markdown fences.\n\n" + "---\n\n" + "Here is the recording data:\n\n" + f"{recording_data_json}\n\n" + "---\n\n" + "Here are the known people (use these to infer speakers in the transcript):\n\n" + f"{people_data_json}\n\n" + "---\n\n" + "Emit the analysis.json now. Begin your response with `{` and end with `}`." + ) + + +def _truncate(s: str, n: int = 1500) -> str: + if len(s) <= n: + return s + return s[: n - 100] + "\n\n... [truncated] ...\n\n" + s[-100:] + + +class SeamAnalyzeAdapter(GEPAAdapter[dict, dict, dict]): + """Custom adapter that runs the analyze prompt via the Claude CLI. + + Args: + lm: ClaudeCliLM wrapping a chosen task model. + people_json_path: path to .seam/people.json for grounding the metric. + enable_consistency: when True, the first example in each batch is run + twice and the two outputs scored via metric.consistency_score(). + Doubles the LM calls for that one example. Default False. + """ + + def __init__( + self, + lm: ClaudeCliLM, + people_json_path: str | Path, + enable_consistency: bool = False, + ): + self.lm = lm + self.enable_consistency = enable_consistency + people_path = Path(people_json_path) + if people_path.exists(): + self.people_data_json = people_path.read_text() + data = json.loads(self.people_data_json) + self.people_names = { + self._norm(p["name"]) for p in data.get("people", []) if p.get("name") + } + else: + self.people_data_json = "{}" + self.people_names = set() + + @staticmethod + def _norm(s: str) -> str: + return re.sub(r"\s+", " ", s.strip().lower()) + + def _run_once( + self, candidate_prompt: str, recording: dict + ) -> tuple[str, dict | None, str | None]: + """Single LM call. Returns (raw_response, parsed_analysis_or_None, error_or_None).""" + recording_data_json = json.dumps(recording) + user_msg = _build_user_message(recording_data_json, self.people_data_json) + messages = [ + {"role": "system", "content": candidate_prompt}, + {"role": "user", "content": user_msg}, + ] + try: + response = self.lm(messages) + except Exception as e: + tb = traceback.format_exc(limit=3) + return ("", None, f"LM call failed: {e}\n{tb}") + analysis = parse_analysis_from_response(response) + if analysis is None: + return (response, None, "json_parse_failed") + return (response, analysis, None) + + def _evaluate_one( + self, + data: dict, + candidate_prompt: str, + consistency_for_this_example: bool, + ) -> tuple[dict, float, dict]: + """Run one example. Returns (output, score, trajectory). + + If ``consistency_for_this_example`` is True, runs the LM a second time + on the same input and adds a consistency_score component. + """ + recording = data["recording"] + traj_meta = { + "recording_dir": data.get("recording_dir"), + "recording_title": recording.get("title"), + } + + response, analysis, err = self._run_once(candidate_prompt, recording) + if err == "json_parse_failed": + traj = { + "data": traj_meta, + "response": _truncate(response), + "feedback": ( + "Output could not be parsed as JSON. The candidate prompt must " + "produce a single raw JSON object as the entire response (no " + "markdown fences, no prose). First 200 chars: " + + repr(response[:200]) + ), + "score": 0.0, + "subscores": {"schema": 0.0}, + "issues": ["json_parse_failed"], + } + return ({"full_assistant_response": response}, 0.0, traj) + if err is not None: + traj = { + "data": traj_meta, + "response": "", + "feedback": err, + "score": 0.0, + "subscores": {}, + "issues": [f"lm_error: {err.splitlines()[0][:200]}"], + } + return ({"full_assistant_response": ""}, 0.0, traj) + + consistency = None + if consistency_for_this_example: + response_b, analysis_b, err_b = self._run_once(candidate_prompt, recording) + if analysis_b is not None: + consistency = consistency_score(analysis, analysis_b) + else: + # Second run failed; treat as zero consistency (the candidate + # prompt isn't reliably producing valid output). + consistency = 0.0 + + result: MetricResult = score_analysis( + analysis, recording, self.people_names, consistency_score=consistency + ) + traj = { + "data": traj_meta, + "response": _truncate(response), + "feedback": result.feedback, + "score": result.total, + "subscores": result.subscores, + "issues": result.issues, + } + return ({"full_assistant_response": response}, result.total, traj) + + def evaluate( + self, + batch: list[dict], + candidate: dict[str, str], + capture_traces: bool = False, + ) -> EvaluationBatch[dict, dict]: + candidate_prompt = candidate.get(COMPONENT_NAME, "") + if not candidate_prompt: + raise ValueError( + f"candidate is missing component {COMPONENT_NAME!r}; got {list(candidate)}" + ) + + outputs: list[dict] = [] + scores: list[float] = [] + trajectories: list[dict] = [] + for i, data in enumerate(batch): + # Only run the consistency double-eval on the first example per + # batch, to keep cost bounded (1 extra LM call per evaluate()). + consistency_here = self.enable_consistency and i == 0 + output, score, traj = self._evaluate_one( + data, candidate_prompt, consistency_here + ) + outputs.append(output) + scores.append(score) + trajectories.append(traj) + + return EvaluationBatch( + outputs=outputs, + scores=scores, + trajectories=trajectories if capture_traces else None, + ) + + def make_reflective_dataset( + self, + candidate: dict[str, str], + eval_batch: EvaluationBatch[dict, dict], + components_to_update: list[str], + ) -> Mapping[str, Sequence[Mapping[str, Any]]]: + assert components_to_update == [COMPONENT_NAME], ( + f"only {COMPONENT_NAME!r} is optimized, got {components_to_update}" + ) + trajectories = eval_batch.trajectories + assert trajectories is not None, "trajectories required for reflection" + + items: list[dict] = [] + for traj in trajectories: + items.append( + { + "Inputs": { + "recording_title": traj["data"].get("recording_title", ""), + }, + "Generated Outputs": traj["response"], + "Feedback": traj["feedback"], + } + ) + if not items: + raise RuntimeError("no trajectories to build reflective dataset from") + return {COMPONENT_NAME: items} diff --git a/prompt_optim/claude_cli_lm.py b/prompt_optim/claude_cli_lm.py new file mode 100644 index 0000000..2322e3a --- /dev/null +++ b/prompt_optim/claude_cli_lm.py @@ -0,0 +1,306 @@ +"""Claude CLI subprocess wrapper for GEPA. + +Routes every model call through `claude -p --output-format json` so the +optimizer uses the user's Max-subscription auth (CLI keychain OAuth) instead +of an Anthropic API key. One auth bucket means we serialize calls and back +off on usage-limit errors. + +The wrapper exposes a single class, ``ClaudeCliLM``, callable in two shapes: + + - ``lm(messages)`` where messages is a list of ``{"role", "content"}`` dicts + (matches GEPA's ``ChatCompletionCallable`` protocol used by ``task_lm``). + - ``lm(prompt)`` where prompt is a string (matches GEPA's ``LanguageModel`` + protocol used by ``reflection_lm``). +""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +import sys +import threading +import time +from collections.abc import Sequence +from pathlib import Path +from typing import Any + +_REPO_ROOT = Path(__file__).resolve().parent.parent + + +def _state_path() -> Path: + env = os.environ.get("SEAM_DATA_DIR") + base = Path(env).expanduser().resolve() if env else _REPO_ROOT / ".seam" + return base / "prompt-optim-state.json" + +# Single global lock — Max is one auth bucket, parallelism just burns it faster. +_CALL_LOCK = threading.Lock() + +# Patterns the CLI emits when the subscription cap is hit. Matched against the +# JSON envelope's `result` / `error` fields case-insensitively. +_RATE_LIMIT_PATTERNS = ( + "usage limit", + "rate limit", + "rate_limit", + "too many requests", + "quota exceeded", + "5-hour limit", + "weekly limit", +) + + +def _looks_rate_limited(envelope: dict[str, Any]) -> bool: + if envelope.get("is_error"): + subtype = (envelope.get("subtype") or "").lower() + if "rate" in subtype or "limit" in subtype or "quota" in subtype: + return True + blob = json.dumps(envelope).lower() + return any(pat in blob for pat in _RATE_LIMIT_PATTERNS) + + +def _flatten_messages(messages: Sequence[dict[str, str]]) -> tuple[str | None, str]: + """Split a messages list into (system_prompt, user_prompt). + + The Claude CLI takes a single positional prompt string plus an optional + --append-system-prompt. We pull all system messages into the system slot + (joined) and all user/assistant messages into the user slot (joined with + role markers, since the CLI is single-turn in -p mode). + """ + system_parts: list[str] = [] + convo_parts: list[str] = [] + for m in messages: + role = m.get("role", "user") + content = m.get("content", "") + if role == "system": + system_parts.append(content) + elif role == "user": + convo_parts.append(content) + elif role == "assistant": + convo_parts.append(f"[Assistant previously said]\n{content}") + else: + convo_parts.append(f"[{role}]\n{content}") + system_prompt = "\n\n".join(system_parts) if system_parts else None + user_prompt = "\n\n".join(convo_parts) if convo_parts else "" + return system_prompt, user_prompt + + +def _load_state() -> dict[str, Any]: + if _state_path().exists(): + try: + return json.loads(_state_path().read_text()) + except json.JSONDecodeError: + pass + return { + "calls": 0, + "rate_limit_hits": 0, + "last_error": None, + "total_cost_usd_proxy": 0.0, + "total_input_tokens": 0, + "total_output_tokens": 0, + "total_cache_read_input_tokens": 0, + "total_cache_creation_input_tokens": 0, + "total_duration_ms": 0, + "by_model": {}, # model -> {"calls", "input_tokens", "output_tokens", "cost_proxy"} + } + + +def _save_state(state: dict[str, Any]) -> None: + _state_path().parent.mkdir(parents=True, exist_ok=True) + _state_path().write_text(json.dumps(state, indent=2)) + + +def _accumulate_telemetry( + state: dict[str, Any], model: str, envelope: dict[str, Any] +) -> None: + """Pull cost/usage fields from the CLI envelope into the state file. + + Note: under Max-subscription auth (OAuth/keychain), the `total_cost_usd` + field is *not* a real dollar charge — there's no API billing event. It's + still a useful proxy of subscription burn, so we record it as + ``total_cost_usd_proxy``. + """ + cost = envelope.get("total_cost_usd") + if isinstance(cost, (int, float)): + state["total_cost_usd_proxy"] = state.get("total_cost_usd_proxy", 0.0) + float(cost) + usage = envelope.get("usage") or {} + for src_key, dst_key in ( + ("input_tokens", "total_input_tokens"), + ("output_tokens", "total_output_tokens"), + ("cache_read_input_tokens", "total_cache_read_input_tokens"), + ("cache_creation_input_tokens", "total_cache_creation_input_tokens"), + ): + v = usage.get(src_key) + if isinstance(v, (int, float)): + state[dst_key] = state.get(dst_key, 0) + int(v) + dur = envelope.get("duration_ms") + if isinstance(dur, (int, float)): + state["total_duration_ms"] = state.get("total_duration_ms", 0) + int(dur) + by = state.setdefault("by_model", {}) + bucket = by.setdefault(model, { + "calls": 0, + "input_tokens": 0, + "output_tokens": 0, + "cost_proxy": 0.0, + }) + bucket["calls"] += 1 + bucket["input_tokens"] += int(usage.get("input_tokens") or 0) + bucket["output_tokens"] += int(usage.get("output_tokens") or 0) + if isinstance(cost, (int, float)): + bucket["cost_proxy"] += float(cost) + + +class RateLimitError(RuntimeError): + pass + + +class ClaudeCliLM: + """Subprocess wrapper around `claude -p --output-format json`. + + Args: + model: claude alias (e.g. "sonnet", "opus", "haiku") or full model id. + timeout: per-call subprocess timeout (seconds). + max_retries: how many times to retry on transient errors. + max_rate_limit_wait: cap on a single backoff sleep (seconds). + """ + + def __init__( + self, + model: str = "sonnet", + timeout: int = 600, + max_retries: int = 8, + max_rate_limit_wait: int = 3600, + verbose: bool = True, + ): + self.model = model + self.timeout = timeout + self.max_retries = max_retries + self.max_rate_limit_wait = max_rate_limit_wait + self.verbose = verbose + + def __call__(self, prompt_or_messages: str | Sequence[dict[str, str]]) -> str: + if isinstance(prompt_or_messages, str): + system_prompt, user_prompt = None, prompt_or_messages + else: + system_prompt, user_prompt = _flatten_messages(prompt_or_messages) + return self._complete(system_prompt, user_prompt) + + def _complete(self, system_prompt: str | None, user_prompt: str) -> str: + attempt = 0 + backoff = 30.0 + while True: + with _CALL_LOCK: + envelope = self._invoke(system_prompt, user_prompt) + if envelope.get("is_error") is False and "result" in envelope: + state = _load_state() + state["calls"] = state.get("calls", 0) + 1 + _accumulate_telemetry(state, self.model, envelope) + _save_state(state) + return envelope["result"] + + # Error path: figure out if it's rate-limit (retry with backoff) + # or something else (bounded retries). + attempt += 1 + if attempt > self.max_retries: + raise RuntimeError( + f"Claude CLI failed after {self.max_retries} retries: " + f"{json.dumps(envelope)[:500]}" + ) + + if _looks_rate_limited(envelope): + state = _load_state() + state["rate_limit_hits"] = state.get("rate_limit_hits", 0) + 1 + state["last_error"] = envelope.get("result") or envelope.get("error") + _save_state(state) + wait = min(backoff, self.max_rate_limit_wait) + if self.verbose: + print( + f"[claude_cli_lm] rate-limit hit (attempt {attempt}); " + f"sleeping {wait:.0f}s then retrying", + file=sys.stderr, + ) + time.sleep(wait) + backoff = min(backoff * 2, self.max_rate_limit_wait) + continue + + # Non-rate-limit error: short backoff and try again. + if self.verbose: + print( + f"[claude_cli_lm] transient error (attempt {attempt}): " + f"{json.dumps(envelope)[:200]}", + file=sys.stderr, + ) + time.sleep(min(5 * attempt, 60)) + + def _invoke(self, system_prompt: str | None, user_prompt: str) -> dict[str, Any]: + cmd = [ + "claude", + "-p", + user_prompt, + "--output-format", + "json", + "--model", + self.model, + ] + if system_prompt: + cmd.extend(["--append-system-prompt", system_prompt]) + + env = os.environ.copy() + # Defensive: if an ANTHROPIC_API_KEY happens to be set, unset it so the + # CLI uses the keychain OAuth (Max subscription) instead of API billing. + env.pop("ANTHROPIC_API_KEY", None) + + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=self.timeout, + env=env, + check=False, + ) + except subprocess.TimeoutExpired as e: + return { + "is_error": True, + "subtype": "timeout", + "result": f"timed out after {self.timeout}s", + "stderr": (e.stderr or "")[-500:] if isinstance(e.stderr, str) else "", + } + + if proc.returncode != 0: + return { + "is_error": True, + "subtype": "nonzero_exit", + "returncode": proc.returncode, + "result": (proc.stderr or proc.stdout or "")[-1000:], + } + + # The CLI prints one JSON object on stdout in --output-format json mode. + try: + return json.loads(proc.stdout) + except json.JSONDecodeError: + return { + "is_error": True, + "subtype": "bad_json", + "result": proc.stdout[-1000:], + "stderr": proc.stderr[-500:], + } + + +def make_lm(model: str = "sonnet", **kwargs: Any) -> ClaudeCliLM: + """Convenience factory.""" + return ClaudeCliLM(model=model, **kwargs) + + +if __name__ == "__main__": + # Smoke test: a tiny round-trip via haiku. + import argparse + + ap = argparse.ArgumentParser() + ap.add_argument("--model", default="haiku") + ap.add_argument("--prompt", default="Reply with exactly the word OK and nothing else.") + args = ap.parse_args() + + lm = ClaudeCliLM(model=args.model, timeout=60, max_retries=2, verbose=True) + out = lm([{"role": "user", "content": args.prompt}]) + print(f"--- response ---\n{out}\n--- end ---") diff --git a/prompt_optim/metric.py b/prompt_optim/metric.py new file mode 100644 index 0000000..f482f31 --- /dev/null +++ b/prompt_optim/metric.py @@ -0,0 +1,770 @@ +"""Reference-free deterministic metric for analyze.md outputs. + +Derived only from the source recording + people.json + the recording's own +transcript. No gold-standard analyses required. + +Component anatomy: + + Hard gate (binary; if 0 the rest are short-circuited) + schema_score JSON parses + matches the schema in analyze.md + + Quality components (weighted sum if the gate passes) + speaker_grounding_score speaker_map names ⊆ people.json AND not generic + participant_consistency participants ⊆ people.json ∪ speaker_map vals, + no generics + attribution_grounding speaker_map + decisions[].by + key_quotes[].speaker + + action_items[].owner all grounded + quote_grounding_score key_quote .text appears as substring in transcript + coverage_spread_score quote timestamps cover all 3 thirds of the recording + takeaway_quality_score pairwise non-redundancy (Jaccard) + transcript + grounding for each takeaway + mind_map_quality_score integrity + connectivity + branching + node-type + variety + output_economy_score sigmoid penalty on bloated analysis.json + consistency_score optional, set externally via score_consistency() + +Weights (sum to 1.0 with consistency disabled): + + speaker_grounding 0.15 + participant_consistency 0.05 + attribution_grounding 0.15 + quote_grounding 0.15 + coverage_spread 0.05 + takeaway_quality 0.20 + mind_map_quality 0.10 + output_economy 0.10 + consistency 0.05 (when enabled; weights re-normalize) + +When consistency is disabled, its 0.05 is redistributed pro-rata across the +other components so the total still ranges over [0, 1]. +""" + +from __future__ import annotations + +import json +import math +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +_REPO_ROOT = Path(__file__).resolve().parent.parent + +# Import is_generic / load_generic_labels from scripts/seed_people.py so the +# metric and production speaker-staging pipeline share one definition. +import sys as _sys + +_sys.path.insert(0, str(_REPO_ROOT / "scripts")) +from seed_people import is_generic, load_generic_labels # noqa: E402 + +_sys.path.pop(0) + + +def _default_seam_dir() -> Path: + """Resolve the .seam/ data directory. + + Honors $SEAM_DATA_DIR if set (useful for git worktrees that don't have + their own .seam/), else defaults to the repo's .seam/. + """ + import os + + env = os.environ.get("SEAM_DATA_DIR") + if env: + return Path(env).expanduser().resolve() + return _REPO_ROOT / ".seam" + + +def _people_path() -> Path: + return _default_seam_dir() / "people.json" + + +_REQUIRED_TOP_LEVEL = { + "recording_id", + "title", + "date", + "duration_seconds", + "type", + "participants", + "executive_summary", + "takeaways", + "decisions", + "action_items", + "open_questions", + "key_quotes", + "topics", + "mind_map", + "sentiment", + "tags_suggested", + "speaker_map", +} + +_VALID_TYPES = {"meeting", "brainstorm", "interview", "lecture", "conversation", "other"} +_VALID_SENTIMENTS = {"positive", "neutral", "negative", "mixed"} +_VALID_NODE_TYPES = {"topic", "subtopic", "decision", "action", "question"} + +# Stopwords used for takeaway grounding & Jaccard. Small built-in list to keep +# stdlib-only. +_STOPWORDS = { + "a", "an", "the", "and", "or", "but", "of", "in", "on", "at", "to", "for", + "with", "from", "by", "is", "was", "be", "are", "were", "been", "being", + "have", "has", "had", "do", "does", "did", "will", "would", "should", + "could", "may", "might", "must", "shall", "can", "this", "that", "these", + "those", "i", "you", "he", "she", "it", "we", "they", "me", "him", "her", + "us", "them", "my", "your", "his", "its", "our", "their", "as", "if", + "so", "not", "no", "yes", "than", "then", "there", "here", "what", "when", + "where", "why", "how", "all", "any", "some", "more", "most", "much", + "many", "very", "also", "too", "just", "only", "about", "into", "out", + "over", "up", "down", "off", "now", "well", "really", "like", "going", + "going to", "want", "need", "think", "thought", "say", "says", "said", + "okay", "ok", "yeah", "right", "good", "great", +} + + +# ---------- helpers ---------------------------------------------------------- + + +@dataclass +class MetricResult: + total: float + subscores: dict[str, float] + feedback: str + issues: list[str] = field(default_factory=list) + + +def _normalize_name(s: str) -> str: + return re.sub(r"\s+", " ", s.strip().lower()) + + +def _whitespace_fuzz(s: str) -> str: + return re.sub(r"\s+", " ", s.strip().lower()) + + +def _content_words(s: str, exclude: set[str] | None = None) -> set[str]: + """Lowercase content tokens (>3 chars, not stopwords, not in exclude).""" + excl = exclude or set() + out: set[str] = set() + for w in re.split(r"\W+", s.lower()): + if len(w) <= 3 or w in _STOPWORDS or w in excl: + continue + out.add(w) + return out + + +def _load_people_names(path: Path | None = None) -> set[str]: + p = path or _people_path() + if not p.exists(): + return set() + try: + data = json.loads(p.read_text()) + except json.JSONDecodeError: + return set() + names: set[str] = set() + for p_ in data.get("people", []): + n = p_.get("name") + if isinstance(n, str) and n.strip(): + names.add(_normalize_name(n)) + return names + + +def _is_grounded_name( + name: str | None, + people: set[str], + generics: set[str], +) -> tuple[bool, str | None]: + """Returns (grounded, reason_if_not). 'Unknown' counts as grounded.""" + if name is None: + return True, None + if not isinstance(name, str) or not name.strip(): + return False, "empty" + if is_generic(name, generics): + return False, "generic" + if _normalize_name(name) in people: + return True, None + return False, "not_in_people_json" + + +# ---------- components ------------------------------------------------------- + + +def _schema_score(analysis: dict[str, Any], issues: list[str]) -> float: + if not isinstance(analysis, dict): + issues.append("output is not a JSON object") + return 0.0 + missing = _REQUIRED_TOP_LEVEL - set(analysis.keys()) + if missing: + issues.append(f"missing required fields: {sorted(missing)}") + return 0.0 + typ = analysis.get("type") + if typ not in _VALID_TYPES: + issues.append(f"type {typ!r} not in {sorted(_VALID_TYPES)}") + return 0.0 + sent = analysis.get("sentiment") + if sent not in _VALID_SENTIMENTS: + issues.append(f"sentiment {sent!r} not in {sorted(_VALID_SENTIMENTS)}") + return 0.0 + for k in ("takeaways", "open_questions", "tags_suggested", "participants"): + if not isinstance(analysis.get(k), list): + issues.append(f"{k} is not a list") + return 0.0 + for k in ("decisions", "action_items", "key_quotes", "topics"): + v = analysis.get(k) + if not isinstance(v, list): + issues.append(f"{k} is not a list") + return 0.0 + mm = analysis.get("mind_map") + if not isinstance(mm, dict) or not isinstance(mm.get("nodes"), list) or not isinstance(mm.get("edges"), list): + issues.append("mind_map missing nodes/edges arrays") + return 0.0 + sm = analysis.get("speaker_map") + if not isinstance(sm, dict): + issues.append("speaker_map is not an object") + return 0.0 + return 1.0 + + +def _speaker_grounding_score( + analysis: dict[str, Any], + people: set[str], + generics: set[str], + issues: list[str], +) -> float: + sm = analysis.get("speaker_map", {}) + if not sm: + return 1.0 + grounded = 0 + total = 0 + bad: dict[str, list[str]] = {} + for _idx, name in sm.items(): + if not isinstance(name, str): + continue + total += 1 + ok, reason = _is_grounded_name(name, people, generics) + if ok: + grounded += 1 + else: + bad.setdefault(reason or "unknown", []).append(name) + if total == 0: + return 1.0 + if bad: + for reason, names in sorted(bad.items()): + issues.append( + f"speaker_map: {len(names)} {reason} name(s), e.g. {sorted(set(names))[:3]}" + ) + return grounded / total + + +def _participant_consistency_score( + analysis: dict[str, Any], + people: set[str], + generics: set[str], + issues: list[str], +) -> float: + parts = analysis.get("participants") or [] + if not parts: + return 1.0 + sm_vals = { + _normalize_name(v) for v in analysis.get("speaker_map", {}).values() + if isinstance(v, str) + } + total = 0 + grounded = 0 + bad: list[str] = [] + for p in parts: + if not isinstance(p, str) or not p.strip(): + continue + total += 1 + norm = _normalize_name(p) + if norm == "unknown": + grounded += 1 + continue + if is_generic(p, generics): + bad.append(f"{p!r} (generic)") + continue + if norm in people or norm in sm_vals: + grounded += 1 + else: + bad.append(f"{p!r} (not in people.json or speaker_map)") + if total == 0: + return 1.0 + if bad: + issues.append(f"participants: {len(bad)} ungrounded: {sorted(set(bad))[:3]}") + return grounded / total + + +def _attribution_grounding_score( + analysis: dict[str, Any], + people: set[str], + generics: set[str], + issues: list[str], +) -> float: + """Aggregate grounding across speaker_map, decisions[].by, key_quotes[].speaker, + action_items[].owner. Each non-null name is one observation; score is the + fraction grounded. + """ + grounded = 0 + total = 0 + bad: dict[str, list[str]] = {} + + def _check(field_path: str, name: str | None) -> None: + nonlocal grounded, total + if name is None: + return + if not isinstance(name, str) or not name.strip(): + return + total += 1 + ok, reason = _is_grounded_name(name, people, generics) + if ok: + grounded += 1 + else: + bad.setdefault(field_path, []).append(name) + + for _i, n in (analysis.get("speaker_map") or {}).items(): + _check("speaker_map", n) + for d in analysis.get("decisions") or []: + if isinstance(d, dict): + _check("decisions[].by", d.get("by")) + for q in analysis.get("key_quotes") or []: + if isinstance(q, dict): + _check("key_quotes[].speaker", q.get("speaker")) + for it in analysis.get("action_items") or []: + if isinstance(it, dict): + _check("action_items[].owner", it.get("owner")) + + if total == 0: + return 1.0 + for field_path, names in sorted(bad.items()): + issues.append( + f"{field_path}: {len(names)} ungrounded, e.g. {sorted(set(names))[:3]}" + ) + return grounded / total + + +def _quote_groundedness_score( + analysis: dict[str, Any], recording: dict[str, Any], issues: list[str] +) -> float: + quotes = analysis.get("key_quotes") or [] + if not quotes: + return 1.0 + transcript = recording.get("transcript") or [] + haystack_pieces: list[str] = [] + for seg in transcript: + for k in ("text", "originalText"): + v = seg.get(k) + if isinstance(v, str): + haystack_pieces.append(_whitespace_fuzz(v)) + haystack = " ||| ".join(haystack_pieces) + if not haystack: + return 1.0 + grounded = 0 + samples: list[str] = [] + for q in quotes: + text = q.get("text") if isinstance(q, dict) else None + if not isinstance(text, str) or not text.strip(): + continue + if _whitespace_fuzz(text) in haystack: + grounded += 1 + else: + if len(samples) < 3: + samples.append(text[:80]) + total = len(quotes) + if grounded < total and samples: + issues.append( + f"key_quotes: {total - grounded}/{total} not in transcript " + f"(samples: {samples})" + ) + return grounded / total if total else 1.0 + + +def _coverage_spread_score( + analysis: dict[str, Any], recording: dict[str, Any], issues: list[str] +) -> float: + """Quote timestamps should cover all 3 thirds of the recording.""" + quotes = analysis.get("key_quotes") or [] + if not quotes: + return 1.0 + duration = recording.get("duration") or 0 + if not duration or duration < 60: + # Recording too short to meaningfully spread across thirds. + return 1.0 + timestamps = [ + q.get("timestamp_seconds") + for q in quotes + if isinstance(q, dict) and isinstance(q.get("timestamp_seconds"), (int, float)) + ] + if not timestamps: + # Quotes exist but none have timestamps. Penalize lightly — model + # didn't invest the effort to attribute them. + issues.append("coverage_spread: no quote has a timestamp") + return 0.5 + third = duration / 3.0 + thirds_hit = {min(int(t / third), 2) for t in timestamps} + score = len(thirds_hit) / 3.0 + if score < 1.0: + missing = sorted({0, 1, 2} - thirds_hit) + names = {0: "first", 1: "middle", 2: "last"} + issues.append( + f"coverage_spread: quotes only cover {len(thirds_hit)}/3 thirds; " + f"missing {[names[i] for i in missing]}" + ) + return score + + +def _takeaway_quality_score( + analysis: dict[str, Any], recording: dict[str, Any], people: set[str], issues: list[str] +) -> float: + """Non-redundancy + transcript grounding. + + redundancy: pairwise Jaccard on content-word sets; pairs with J > 0.6 are + redundant. Score = 1 - (redundant_pairs / max_pairs). + + grounding: each takeaway should share >= 2 content words with the + transcript. Score = grounded / total. + + Combined as the average of the two sub-scores. + """ + takeaways = analysis.get("takeaways") or [] + takeaways = [t for t in takeaways if isinstance(t, str) and t.strip()] + if not takeaways: + # Schema gate ensured the field is a list; empty isn't a hard fail + # here (some recordings genuinely have nothing to say) but we score + # neutral 0.5 to avoid rewarding empty outputs. + return 0.5 + + name_tokens = {tok for n in people for tok in n.split()} + + transcript_text = " ".join( + seg.get("text") or seg.get("originalText") or "" + for seg in (recording.get("transcript") or []) + if isinstance(seg, dict) + ) + transcript_words = _content_words(transcript_text, exclude=name_tokens) + + word_sets = [_content_words(t, exclude=name_tokens) for t in takeaways] + + # Non-redundancy + n = len(takeaways) + if n >= 2: + redundant = 0 + max_pairs = n * (n - 1) // 2 + for i in range(n): + for j in range(i + 1, n): + a, b = word_sets[i], word_sets[j] + if not a or not b: + continue + jacc = len(a & b) / len(a | b) + if jacc > 0.6: + redundant += 1 + nonredundancy = 1.0 - (redundant / max_pairs) if max_pairs else 1.0 + if redundant: + issues.append( + f"takeaways: {redundant}/{max_pairs} pair(s) redundant (Jaccard > 0.6)" + ) + else: + nonredundancy = 1.0 + + # Grounding + if not transcript_words: + grounding = 1.0 + else: + grounded = 0 + ungrounded_samples: list[str] = [] + for t, ws in zip(takeaways, word_sets, strict=True): + shared = ws & transcript_words + if len(shared) >= 2: + grounded += 1 + else: + if len(ungrounded_samples) < 3: + ungrounded_samples.append(t[:80]) + grounding = grounded / len(takeaways) + if grounded < len(takeaways) and ungrounded_samples: + issues.append( + f"takeaways: {len(takeaways) - grounded}/{len(takeaways)} " + f"not grounded in transcript (samples: {ungrounded_samples})" + ) + + return (nonredundancy + grounding) / 2.0 + + +def _mind_map_quality_score( + analysis: dict[str, Any], recording: dict[str, Any], issues: list[str] +) -> float: + """Integrity + connectivity + branching factor + node-type variety. + + Each sub-check contributes 0.25 to the component (so all four pass = 1.0). + """ + mm = analysis.get("mind_map", {}) + nodes = mm.get("nodes") or [] + edges = mm.get("edges") or [] + if not nodes: + issues.append("mind_map: no nodes") + return 0.0 + node_objs = [n for n in nodes if isinstance(n, dict)] + node_ids = {n.get("id") for n in node_objs} + + # 1) Integrity: every edge endpoint exists. + bad_edges = sum( + 1 + for e in edges + if not isinstance(e, dict) + or e.get("source") not in node_ids + or e.get("target") not in node_ids + ) + integrity = 0.0 if bad_edges else 1.0 + if bad_edges: + issues.append(f"mind_map: {bad_edges} edge(s) reference missing nodes") + + # 2) Connectivity: undirected components count == 1. + if len(nodes) == 1: + connectivity = 1.0 + else: + adj: dict[str, set[str]] = {nid: set() for nid in node_ids if nid is not None} + for e in edges: + if not isinstance(e, dict): + continue + s, t = e.get("source"), e.get("target") + if s in adj and t in adj: + adj[s].add(t) + adj[t].add(s) + seen: set[str] = set() + components = 0 + for start in adj: + if start in seen: + continue + components += 1 + stack = [start] + while stack: + cur = stack.pop() + if cur in seen: + continue + seen.add(cur) + stack.extend(adj[cur]) + connectivity = 1.0 if components == 1 else 0.0 + if components > 1: + issues.append(f"mind_map: {components} disconnected components") + + # 3) Branching factor: prefer 3..7 top-level branches from the most-connected + # node (root proxy). Penalize 1 (degenerate) and >12 (giant fan). + if not adj or len(adj) < 2: + branching = 1.0 + else: + # Pick the node with most connections as the root proxy. + root = max(adj, key=lambda k: len(adj[k])) + deg = len(adj[root]) + if 3 <= deg <= 7: + branching = 1.0 + elif deg in (2, 8, 9, 10, 11, 12) or deg == 1: + branching = 0.5 + issues.append( + f"mind_map: root branching factor {deg} (prefer 3-7)" + ) + else: # > 12 + branching = 0.0 + issues.append( + f"mind_map: root branching factor {deg} > 12 (giant fan)" + ) + + # 4) Node-type variety: at least 2 distinct types from the valid set. + types = {n.get("type") for n in node_objs if n.get("type") in _VALID_NODE_TYPES} + if len(types) >= 2: + variety = 1.0 + elif len(types) == 1: + variety = 0.5 + issues.append( + f"mind_map: all nodes share one type ({next(iter(types))!r}); " + "expected mix of topic/decision/action/question" + ) + else: + variety = 0.0 + issues.append("mind_map: no nodes have a recognized type") + + return (integrity + connectivity + branching + variety) / 4.0 + + +def _output_economy_score(analysis: dict[str, Any]) -> float: + """Sigmoid penalty on serialized JSON size. + + Calibrated against the existing 178 analyses (size distribution: + p25=11kB, median=14kB, p75=16kB, max=24kB). Centered at 14 kB so the + *median current output* scores 0.5 — providing a real gradient toward + smaller outputs without immediately blowing up the seed prompt's score. + + Approximate scores: ~0.95 at 6 kB, ~0.85 at 8 kB, ~0.73 at 11 kB, + ~0.50 at 14 kB, ~0.31 at 16 kB, ~0.10 at 20 kB. + """ + size = len(json.dumps(analysis)) + x = (size - 14000) / 3000 + return 1.0 / (1.0 + math.exp(x)) + + +# ---------- weighting -------------------------------------------------------- + + +_BASE_WEIGHTS = { + "speaker_grounding": 0.15, + "participant_consistency": 0.05, + "attribution_grounding": 0.15, + "quote_grounding": 0.15, + "coverage_spread": 0.05, + "takeaway_quality": 0.20, + "mind_map_quality": 0.10, + "output_economy": 0.10, + "consistency": 0.05, +} + + +def _weights_for(include_consistency: bool) -> dict[str, float]: + if include_consistency: + return dict(_BASE_WEIGHTS) + # Drop consistency, redistribute its 0.05 pro-rata across the rest. + rest = {k: v for k, v in _BASE_WEIGHTS.items() if k != "consistency"} + factor = 1.0 / sum(rest.values()) + return {k: v * factor for k, v in rest.items()} + + +# ---------- public API ------------------------------------------------------- + + +def score_analysis( + analysis: dict[str, Any], + recording: dict[str, Any], + people: set[str] | None = None, + consistency_score: float | None = None, +) -> MetricResult: + if people is None: + people = _load_people_names() + generics = load_generic_labels() + issues: list[str] = [] + + schema = _schema_score(analysis, issues) + if schema == 0.0: + return MetricResult( + total=0.0, + subscores={k: 0.0 for k in _BASE_WEIGHTS}, + feedback="Output failed schema validation. Issues:\n- " + "\n- ".join(issues), + issues=issues, + ) + + subs: dict[str, float] = { + "speaker_grounding": _speaker_grounding_score(analysis, people, generics, issues), + "participant_consistency": _participant_consistency_score(analysis, people, generics, issues), + "attribution_grounding": _attribution_grounding_score(analysis, people, generics, issues), + "quote_grounding": _quote_groundedness_score(analysis, recording, issues), + "coverage_spread": _coverage_spread_score(analysis, recording, issues), + "takeaway_quality": _takeaway_quality_score(analysis, recording, people, issues), + "mind_map_quality": _mind_map_quality_score(analysis, recording, issues), + "output_economy": _output_economy_score(analysis), + } + if consistency_score is not None: + subs["consistency"] = float(consistency_score) + + weights = _weights_for(include_consistency=consistency_score is not None) + total = sum(weights[k] * subs[k] for k in weights) + + feedback_parts = [ + f"Score: {total:.3f}.", + "Subscores: " + ", ".join(f"{k}={v:.2f}" for k, v in subs.items()) + ".", + ] + if issues: + feedback_parts.append("Issues:\n- " + "\n- ".join(issues)) + else: + feedback_parts.append("All quality gates passed.") + return MetricResult(total=total, subscores=subs, feedback="\n".join(feedback_parts), issues=issues) + + +# ---------- consistency between two runs ------------------------------------- + + +def _set_jaccard(a: set[str], b: set[str]) -> float: + if not a and not b: + return 1.0 + if not a or not b: + return 0.0 + return len(a & b) / len(a | b) + + +def consistency_score(analysis_a: dict[str, Any], analysis_b: dict[str, Any]) -> float: + """Score how consistent two analyses of the same recording are. + + Computes Jaccard similarity over content-word sets of takeaways and over + speaker_map name sets, averaged. Higher is better. + """ + if not isinstance(analysis_a, dict) or not isinstance(analysis_b, dict): + return 0.0 + + def _takeaway_words(a: dict) -> set[str]: + ts = a.get("takeaways") or [] + out: set[str] = set() + for t in ts: + if isinstance(t, str): + out |= _content_words(t) + return out + + def _speaker_set(a: dict) -> set[str]: + sm = a.get("speaker_map") or {} + return { + _normalize_name(v) for v in sm.values() + if isinstance(v, str) and v.strip() + } + + j_takeaways = _set_jaccard(_takeaway_words(analysis_a), _takeaway_words(analysis_b)) + j_speakers = _set_jaccard(_speaker_set(analysis_a), _speaker_set(analysis_b)) + return (j_takeaways + j_speakers) / 2.0 + + +# ---------- response parsing ------------------------------------------------- + + +def parse_analysis_from_response(response: str) -> dict[str, Any] | None: + """Best-effort extraction of an analysis JSON object from a model response.""" + s = response.strip() + try: + obj = json.loads(s) + if isinstance(obj, dict): + return obj + except json.JSONDecodeError: + pass + m = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", s, re.DOTALL) + if m: + try: + obj = json.loads(m.group(1)) + if isinstance(obj, dict): + return obj + except json.JSONDecodeError: + pass + start = s.find("{") + end = s.rfind("}") + if start != -1 and end != -1 and end > start: + try: + obj = json.loads(s[start : end + 1]) + if isinstance(obj, dict): + return obj + except json.JSONDecodeError: + pass + return None + + +# ---------- CLI smoke entry -------------------------------------------------- + + +if __name__ == "__main__": + import sys + + if len(sys.argv) != 3: + print( + "usage: python -m prompt_optim.metric " + " ", + file=sys.stderr, + ) + sys.exit(2) + rec_dir = Path(sys.argv[1]) + analysis_path = Path(sys.argv[2]) + recording = json.loads((rec_dir / "recording.json").read_text()) + analysis = json.loads(analysis_path.read_text()) + result = score_analysis(analysis, recording) + print(f"total: {result.total:.3f}") + for k, v in result.subscores.items(): + print(f" {k:24s} {v:.3f}") + if result.issues: + print("\nissues:") + for i in result.issues: + print(f" - {i}") diff --git a/prompt_optim/optimize.py b/prompt_optim/optimize.py new file mode 100644 index 0000000..22c6353 --- /dev/null +++ b/prompt_optim/optimize.py @@ -0,0 +1,195 @@ +"""GEPA driver for the Seam analyze prompt. + +Usage (from repo root, with venv): + + SEAM_DATA_DIR=/path/to/.seam .venv-gepa/bin/python3 -m prompt_optim.optimize \\ + [--budget 20] [--task-model sonnet] [--reflection-model opus] \\ + [--out prompts/analyze.optimized.md] + +Defaults are deliberately tiny (budget=20) for the MVP so a first run finishes +in a reasonable time and you can confirm the wiring before scaling up. After +the wiring is confirmed, expand the trainset (see ``_build_trainset``) and +bump ``--budget`` to ~150. + +The optimized prompt is written to ``prompts/analyze.optimized.md`` for human +review. The production [prompts/analyze.md] is **never** auto-replaced. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path + +import gepa + +from prompt_optim.adapter import COMPONENT_NAME, SeamAnalyzeAdapter +from prompt_optim.claude_cli_lm import ClaudeCliLM + + +# Trainset: 2 recordings chosen for variety. Bump after MVP wiring is confirmed. +# - team-sync-update: 27 segments, ~13min, multi-speaker meeting. +# - thyroid-medication-adjustment: 11 segments, ~1min, short conversation. +DEFAULT_TRAIN_DIRS = [ + "2026-02-23_team-sync-update", + "2026-03-05_thyroid-medication-adjustment", +] + + +def _load_recording(rec_dir: Path) -> dict: + return { + "recording_dir": str(rec_dir), + "recording": json.loads((rec_dir / "recording.json").read_text()), + } + + +def _build_trainset(seam_dir: Path, names: list[str]) -> list[dict]: + out: list[dict] = [] + for name in names: + d = seam_dir / "recordings" / name + if not (d / "recording.json").exists(): + raise FileNotFoundError( + f"trainset recording missing: {d / 'recording.json'}" + ) + out.append(_load_recording(d)) + return out + + +def main() -> int: + repo_root = Path(__file__).resolve().parent.parent + + ap = argparse.ArgumentParser() + ap.add_argument("--budget", type=int, default=20, help="max_metric_calls") + ap.add_argument("--task-model", default="sonnet") + ap.add_argument("--reflection-model", default="opus") + ap.add_argument("--task-timeout", type=int, default=300) + ap.add_argument("--reflection-timeout", type=int, default=600) + ap.add_argument( + "--seam-dir", + default=os.environ.get("SEAM_DATA_DIR") or str(repo_root / ".seam"), + ) + ap.add_argument("--seed-prompt", default=str(repo_root / "prompts" / "analyze.md")) + ap.add_argument( + "--out", default=str(repo_root / "prompts" / "analyze.optimized.md") + ) + ap.add_argument( + "--train", + nargs="*", + default=DEFAULT_TRAIN_DIRS, + help="recording dir names under /recordings/", + ) + ap.add_argument("--seed", type=int, default=0) + ap.add_argument( + "--consistency", + action="store_true", + help=( + "Enable the consistency metric component: re-runs the first batch " + "example a second time per evaluate() and scores Jaccard over " + "takeaways/speaker_map. Adds ~1 LM call per evaluate(); intended " + "for production-strength runs (--budget >= 100)." + ), + ) + args = ap.parse_args() + + seam_dir = Path(args.seam_dir).expanduser().resolve() + seed_prompt_path = Path(args.seed_prompt) + out_path = Path(args.out) + + if not seed_prompt_path.exists(): + print(f"ERROR: seed prompt not found: {seed_prompt_path}", file=sys.stderr) + return 1 + if not seam_dir.exists(): + print(f"ERROR: seam data dir not found: {seam_dir}", file=sys.stderr) + return 1 + + print(f"[optimize] seed prompt: {seed_prompt_path}") + print(f"[optimize] seam dir: {seam_dir}") + print(f"[optimize] task model: {args.task_model}") + print(f"[optimize] reflection: {args.reflection_model}") + print(f"[optimize] budget: {args.budget} metric calls") + print(f"[optimize] trainset: {args.train}") + print(f"[optimize] consistency: {'on' if args.consistency else 'off'}") + print(f"[optimize] output: {out_path}") + + seed_prompt = seed_prompt_path.read_text() + trainset = _build_trainset(seam_dir, args.train) + + task_lm = ClaudeCliLM( + model=args.task_model, + timeout=args.task_timeout, + max_retries=8, + verbose=True, + ) + reflection_lm = ClaudeCliLM( + model=args.reflection_model, + timeout=args.reflection_timeout, + max_retries=8, + verbose=True, + ) + + adapter = SeamAnalyzeAdapter( + lm=task_lm, + people_json_path=seam_dir / "people.json", + enable_consistency=args.consistency, + ) + + seed_candidate = {COMPONENT_NAME: seed_prompt} + + result = gepa.optimize( + seed_candidate=seed_candidate, + trainset=trainset, + valset=trainset, # MVP: train=val; introduce a real valset post-MVP + adapter=adapter, + reflection_lm=reflection_lm, + max_metric_calls=args.budget, + seed=args.seed, + display_progress_bar=False, + # task_lm is unused when adapter is provided (adapter holds its own LM). + ) + + best = result.best_candidate + optimized_prompt = best[COMPONENT_NAME] if isinstance(best, dict) else best + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(optimized_prompt) + print(f"[optimize] wrote optimized prompt to {out_path}") + + try: + best_score = result.val_aggregate_scores[result.best_idx] + seed_score = result.val_aggregate_scores[0] + print( + f"[optimize] seed score: {seed_score:.4f} -> " + f"best score: {best_score:.4f} " + f"({len(result.val_aggregate_scores)} candidates evaluated)" + ) + except (IndexError, AttributeError) as e: + print(f"[optimize] (could not summarize scores: {e})") + + # Telemetry summary from the LM state file. + state_path = seam_dir / "prompt-optim-state.json" + if state_path.exists(): + try: + state = json.loads(state_path.read_text()) + print( + f"[optimize] telemetry: " + f"{state.get('calls', 0)} calls, " + f"rate-limit hits: {state.get('rate_limit_hits', 0)}, " + f"input tok: {state.get('total_input_tokens', 0):,}, " + f"output tok: {state.get('total_output_tokens', 0):,}, " + f"cost proxy: ${state.get('total_cost_usd_proxy', 0.0):.2f}" + ) + by_model = state.get("by_model") or {} + for model_name, b in sorted(by_model.items()): + print( + f"[optimize] {model_name}: {b['calls']} calls, " + f"in={b['input_tokens']:,} out={b['output_tokens']:,} " + f"cost_proxy=${b['cost_proxy']:.2f}" + ) + except (OSError, json.JSONDecodeError): + pass + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/prompt_optim/requirements.txt b/prompt_optim/requirements.txt new file mode 100644 index 0000000..36508aa --- /dev/null +++ b/prompt_optim/requirements.txt @@ -0,0 +1 @@ +gepa>=0.1.1