diff --git a/.gitignore b/.gitignore index d538a3b..74a1f6d 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,6 @@ npm-debug.log* # Generated public/manifest.json + +# Claude Code +.claude/*.local.json \ No newline at end of file diff --git a/scripts/pocket_pull.py b/scripts/pocket_pull.py index 91accbb..0aff12b 100755 --- a/scripts/pocket_pull.py +++ b/scripts/pocket_pull.py @@ -7,7 +7,9 @@ import json import os +import random import sys +import time import urllib.request import urllib.error from datetime import datetime, timezone @@ -18,9 +20,15 @@ RECORDINGS_DIR = DATA_DIR / "recordings" SYNC_FILE = ROOT / ".pocket-last-sync" DELETED_FILE = DATA_DIR / ".deleted" +PENDING_FETCH_FILE = DATA_DIR / ".pending-fetch" BASE_URL = "https://public.heypocketai.com/api/v1" +MAX_RETRIES = 5 +BACKOFF_BASE_SECONDS = 2.0 +BACKOFF_CAP_SECONDS = 60.0 +RETRY_STATUSES = {429, 500, 502, 503, 504} + def read_deleted() -> set[str]: """Read the set of deleted recording dir names.""" @@ -45,6 +53,44 @@ def get_api_key() -> str: return key +def _sleep_for_retry(attempt: int, retry_after: str | None) -> None: + """Sleep before a retry. Honor Retry-After header if present, otherwise + use exponential backoff with jitter, capped at BACKOFF_CAP_SECONDS.""" + delay: float | None = None + if retry_after: + try: + delay = float(retry_after) + except ValueError: + delay = None + if delay is None: + delay = min(BACKOFF_BASE_SECONDS * (2 ** attempt), BACKOFF_CAP_SECONDS) + delay += random.uniform(0, delay * 0.25) + time.sleep(delay) + + +def _request_with_retry(req: urllib.request.Request) -> dict: + """Open a request and parse JSON, retrying on 429 and 5xx with backoff.""" + last_error: urllib.error.HTTPError | None = None + for attempt in range(MAX_RETRIES): + try: + with urllib.request.urlopen(req) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + if e.code not in RETRY_STATUSES or attempt == MAX_RETRIES - 1: + raise + last_error = e + retry_after = e.headers.get("Retry-After") if e.headers else None + print( + f" HTTP {e.code} on {req.full_url}, retrying " + f"(attempt {attempt + 1}/{MAX_RETRIES})...", + file=sys.stderr, + ) + _sleep_for_retry(attempt, retry_after) + # Unreachable — final iteration either returns or re-raises. + assert last_error is not None + raise last_error + + def api_get(path: str, api_key: str, params: dict | None = None) -> dict: url = f"{BASE_URL}{path}" if params: @@ -55,8 +101,7 @@ def api_get(path: str, api_key: str, params: dict | None = None) -> dict: "Authorization": f"Bearer {api_key}", "Accept": "application/json", }) - with urllib.request.urlopen(req) as resp: - return json.loads(resp.read().decode()) + return _request_with_retry(req) def api_post(path: str, api_key: str, body: dict) -> dict: @@ -67,8 +112,7 @@ def api_post(path: str, api_key: str, body: dict) -> dict: "Content-Type": "application/json", "Accept": "application/json", }) - with urllib.request.urlopen(req) as resp: - return json.loads(resp.read().decode()) + return _request_with_retry(req) def get_last_sync() -> str | None: @@ -82,6 +126,23 @@ def set_last_sync(ts: str): SYNC_FILE.write_text(ts + "\n") +def read_pending_fetch() -> list[str]: + """Read recording IDs whose detail fetch failed on a prior run.""" + if not PENDING_FETCH_FILE.exists(): + return [] + return [line.strip() for line in PENDING_FETCH_FILE.read_text().splitlines() if line.strip()] + + +def write_pending_fetch(ids: list[str]) -> None: + """Persist recording IDs that still need to be fetched. Empties the file + when ids is empty so the next run has a clean slate.""" + PENDING_FETCH_FILE.parent.mkdir(parents=True, exist_ok=True) + if ids: + PENDING_FETCH_FILE.write_text("\n".join(ids) + "\n") + elif PENDING_FETCH_FILE.exists(): + PENDING_FETCH_FILE.unlink() + + def list_recordings(api_key: str, start_date: str | None = None) -> list[dict]: """Fetch all recordings since start_date, handling pagination.""" all_recordings = [] @@ -289,6 +350,17 @@ def main(): start_date = last_sync[:10] if last_sync else None recordings = list_recordings(api_key, start_date) + # Re-attempt any IDs that failed on prior runs, even if they fall outside + # the start_date window. The catalog returns metadata for these too, so we + # only need to ensure they're in the fetch loop — not duplicated. + pending_ids = read_pending_fetch() + seen_ids = {r.get("id") for r in recordings if r.get("id")} + missing_pending = [pid for pid in pending_ids if pid not in seen_ids] + if missing_pending: + print(f" Re-attempting {len(missing_pending)} pending fetch(es) from prior run(s)") + for pid in missing_pending: + recordings.append({"id": pid, "title": f"(pending: {pid})"}) + if not recordings: print(" No new recordings found.") set_last_sync(datetime.now(timezone.utc).isoformat()) @@ -296,7 +368,8 @@ def main(): print(f" Found {len(recordings)} recording(s)") - # Filter out pending recordings (not yet processed by Pocket) + # Filter out pending recordings (not yet processed by Pocket). + # Stub entries from .pending-fetch have no "state" so they pass through. ready = [r for r in recordings if r.get("state") != "pending"] pending = len(recordings) - len(ready) if pending: @@ -306,6 +379,7 @@ def main(): # Fetch details and write each recording deleted = read_deleted() new_dirs = [] + failed_ids: list[str] = [] for rec in recordings: rec_id = rec.get("id") if not rec_id: @@ -320,13 +394,20 @@ def main(): new_dirs.append(dir_name) except urllib.error.HTTPError as e: print(f" ERROR fetching {rec_id}: {e}", file=sys.stderr) + failed_ids.append(rec_id) continue - # Update sync timestamp + # Persist failures so the next run retries them regardless of watermark. + write_pending_fetch(failed_ids) + + # Advance sync timestamp even on partial failure — pending-fetch is the + # safety net that keeps failed IDs in the next run's fetch list. now = datetime.now(timezone.utc).isoformat() set_last_sync(now) print(f"\nDone. Pulled {len(new_dirs)} recording(s).") + if failed_ids: + print(f" {len(failed_ids)} fetch(es) failed; will retry next run.") print(f"Sync timestamp: {now}") # Write list of new dirs to stdout for the orchestration script diff --git a/scripts/seed-people.py b/scripts/seed-people.py new file mode 100644 index 0000000..7fe51ee --- /dev/null +++ b/scripts/seed-people.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +"""Seed .seam/people.json from existing analyses' speaker_maps. + +Scans every .seam/analysis/*/analysis.json, collects unique speakers from +speaker_map, filters out generic role labels, and writes them to +.seam/people.json with source="inferred". Skips names that already exist +(case-insensitive) so it's safe to re-run. +""" +from __future__ import annotations + +import json +import re +import sys +import uuid +from datetime import datetime, timezone +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +ANALYSIS_DIR = ROOT / ".seam" / "analysis" +PEOPLE_FILE = ROOT / ".seam" / "people.json" + +GENERIC_TOKENS = { + "unknown", "speaker", "narrator", "host", "facilitator", "moderator", + "chair", "manager", "lead", "engineer", "developer", "designer", + "analyst", "patient", "caller", "partner", "peer", "team", "member", + "physician", "doctor", "nurse", "guide", "tour", "father", "mother", + "parent", "child", "interviewer", "interviewee", "participant", + "guest", "attendee", "client", "customer", "user", "tech", + "platform", "analytics", "ela", "ent", "yc", "new", +GENERIC_LABELS = { + "unknown", "speaker", "narrator", "host", "facilitator", "moderator", + "chair", "participant", "interviewer", "interviewee", + "guest", "attendee", "caller", "member", "team", +} + +SPEAKER_NUM_RE = re.compile(r"^speaker\s*\d+$", re.IGNORECASE) + + +def is_generic(name: str) -> bool: + n = name.strip() + if not n or n.lower() == "unknown": + return True + # Trailing parenthetical: "Mark (Speaker 01)" -> strip and re-test base + base = re.sub(r"\s*\([^)]*\)\s*$", "", n).strip() + if base != n and not is_generic(base): + return False # has a real name before the paren + if SPEAKER_NUM_RE.match(n): + return True + # All tokens are generic role words → skip + tokens = [t.lower() for t in re.split(r"[\s\-/]+", n) if t] + if tokens and all(t in GENERIC_TOKENS or t.isdigit() for t in tokens): + return True + return False + + +def canonical(name: str) -> str: + """Strip trailing parentheticals so 'Mark (Speaker 01)' merges with 'Mark'.""" + return re.sub(r"\s*\([^)]*\)\s*$", "", name).strip() + + +def main() -> int: + if not ANALYSIS_DIR.exists(): + print(f"No analysis directory at {ANALYSIS_DIR}", file=sys.stderr) + return 1 + + # Tally by canonical name → keep the longest variant as display name + # (e.g. prefer "Chris Woodson" over "Chris" when both appear). + variants: dict[str, dict[str, int]] = {} + for analysis_path in sorted(ANALYSIS_DIR.glob("*/analysis.json")): + try: + data = json.loads(analysis_path.read_text()) + except (OSError, json.JSONDecodeError): + continue + speaker_map = data.get("speaker_map") or {} + for raw in speaker_map.values(): + if not isinstance(raw, str): + continue + name = raw.strip() + if is_generic(name): + continue + key = canonical(name).lower() + if not key: + continue + variants.setdefault(key, {})[name] = variants.setdefault(key, {}).get(name, 0) + 1 + + # Pick a display name per canonical key: most-used, breaking ties by length. + chosen: list[str] = [] + for key, counts in variants.items(): + best = sorted(counts.items(), key=lambda kv: (-kv[1], -len(kv[0])))[0][0] + chosen.append(best) + chosen.sort() + + # Load existing people, skip duplicates by name (case-insensitive). + existing: list[dict] = [] + if PEOPLE_FILE.exists(): + try: + existing = json.loads(PEOPLE_FILE.read_text()).get("people", []) + except (OSError, json.JSONDecodeError): + existing = [] + existing_names = {p["name"].lower() for p in existing if "name" in p} + + now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + added = 0 + for name in chosen: + if name.lower() in existing_names: + continue + existing.append({ + "id": str(uuid.uuid4()), + "name": name, + "source": "inferred", + "createdAt": now, + }) + existing_names.add(name.lower()) + added += 1 + + PEOPLE_FILE.parent.mkdir(parents=True, exist_ok=True) + PEOPLE_FILE.write_text(json.dumps({"people": existing}, indent=2) + "\n") + print(f"Seeded {added} new person(s) (total: {len(existing)}) -> {PEOPLE_FILE}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/tests/test_pocket_pull.py b/scripts/tests/test_pocket_pull.py index 93914b4..ab66984 100644 --- a/scripts/tests/test_pocket_pull.py +++ b/scripts/tests/test_pocket_pull.py @@ -226,3 +226,113 @@ def test_missing_exits(self): # Need pytest for the SystemExit test import pytest +import urllib.error + + +def make_http_error(code: int, retry_after: str | None = None) -> urllib.error.HTTPError: + headers = {} + if retry_after is not None: + headers["Retry-After"] = retry_after + return urllib.error.HTTPError( + url="http://test/x", code=code, msg=f"HTTP {code}", hdrs=headers, fp=None + ) + + +class TestRequestWithRetry: + def test_succeeds_first_try(self): + with patch("pocket_pull.urllib.request.urlopen") as mock_open, \ + patch("pocket_pull.time.sleep") as mock_sleep: + mock_open.return_value = make_response({"ok": True}) + req = MagicMock() + req.full_url = "http://test/x" + result = pocket_pull._request_with_retry(req) + assert result == {"ok": True} + assert mock_open.call_count == 1 + mock_sleep.assert_not_called() + + def test_retries_on_429_then_succeeds(self): + with patch("pocket_pull.urllib.request.urlopen") as mock_open, \ + patch("pocket_pull.time.sleep") as mock_sleep: + mock_open.side_effect = [ + make_http_error(429, retry_after="1"), + make_http_error(429, retry_after="1"), + make_response({"ok": True}), + ] + req = MagicMock() + req.full_url = "http://test/x" + result = pocket_pull._request_with_retry(req) + assert result == {"ok": True} + assert mock_open.call_count == 3 + assert mock_sleep.call_count == 2 + + def test_retries_on_503(self): + with patch("pocket_pull.urllib.request.urlopen") as mock_open, \ + patch("pocket_pull.time.sleep"): + mock_open.side_effect = [ + make_http_error(503), + make_response({"ok": True}), + ] + req = MagicMock() + req.full_url = "http://test/x" + assert pocket_pull._request_with_retry(req) == {"ok": True} + assert mock_open.call_count == 2 + + def test_does_not_retry_on_404(self): + with patch("pocket_pull.urllib.request.urlopen") as mock_open, \ + patch("pocket_pull.time.sleep") as mock_sleep: + mock_open.side_effect = [make_http_error(404)] + req = MagicMock() + req.full_url = "http://test/x" + with pytest.raises(urllib.error.HTTPError) as exc: + pocket_pull._request_with_retry(req) + assert exc.value.code == 404 + assert mock_open.call_count == 1 + mock_sleep.assert_not_called() + + def test_raises_after_max_retries(self): + with patch("pocket_pull.urllib.request.urlopen") as mock_open, \ + patch("pocket_pull.time.sleep"): + mock_open.side_effect = [make_http_error(429) for _ in range(pocket_pull.MAX_RETRIES)] + req = MagicMock() + req.full_url = "http://test/x" + with pytest.raises(urllib.error.HTTPError) as exc: + pocket_pull._request_with_retry(req) + assert exc.value.code == 429 + assert mock_open.call_count == pocket_pull.MAX_RETRIES + + def test_honors_retry_after_header(self): + with patch("pocket_pull.urllib.request.urlopen") as mock_open, \ + patch("pocket_pull.time.sleep") as mock_sleep: + mock_open.side_effect = [ + make_http_error(429, retry_after="7"), + make_response({"ok": True}), + ] + req = MagicMock() + req.full_url = "http://test/x" + pocket_pull._request_with_retry(req) + mock_sleep.assert_called_once_with(7.0) + + +class TestPendingFetch: + def test_read_missing_returns_empty(self, tmp_path): + with patch.object(pocket_pull, "PENDING_FETCH_FILE", tmp_path / ".pending-fetch"): + assert pocket_pull.read_pending_fetch() == [] + + def test_round_trip(self, tmp_path): + path = tmp_path / ".pending-fetch" + with patch.object(pocket_pull, "PENDING_FETCH_FILE", path): + pocket_pull.write_pending_fetch(["a", "b", "c"]) + assert pocket_pull.read_pending_fetch() == ["a", "b", "c"] + + def test_write_empty_removes_file(self, tmp_path): + path = tmp_path / ".pending-fetch" + path.write_text("a\nb\n") + with patch.object(pocket_pull, "PENDING_FETCH_FILE", path): + pocket_pull.write_pending_fetch([]) + assert not path.exists() + + def test_read_skips_blank_lines(self, tmp_path): + path = tmp_path / ".pending-fetch" + path.write_text("a\n\nb\n \n") + with patch.object(pocket_pull, "PENDING_FETCH_FILE", path): + assert pocket_pull.read_pending_fetch() == ["a", "b"]