Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ npm-debug.log*

# Generated
public/manifest.json

# Claude Code
.claude/*.local.json
93 changes: 87 additions & 6 deletions scripts/pocket_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import json
import os
import random
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone
Expand All @@ -18,9 +20,15 @@
RECORDINGS_DIR = DATA_DIR / "recordings"
SYNC_FILE = ROOT / ".pocket-last-sync"
DELETED_FILE = DATA_DIR / ".deleted"
PENDING_FETCH_FILE = DATA_DIR / ".pending-fetch"

BASE_URL = "https://public.heypocketai.com/api/v1"

MAX_RETRIES = 5
BACKOFF_BASE_SECONDS = 2.0
BACKOFF_CAP_SECONDS = 60.0
RETRY_STATUSES = {429, 500, 502, 503, 504}


def read_deleted() -> set[str]:
"""Read the set of deleted recording dir names."""
Expand All @@ -45,6 +53,44 @@ def get_api_key() -> str:
return key


def _sleep_for_retry(attempt: int, retry_after: str | None) -> None:
"""Sleep before a retry. Honor Retry-After header if present, otherwise
use exponential backoff with jitter, capped at BACKOFF_CAP_SECONDS."""
delay: float | None = None
if retry_after:
try:
delay = float(retry_after)
except ValueError:
delay = None
if delay is None:
delay = min(BACKOFF_BASE_SECONDS * (2 ** attempt), BACKOFF_CAP_SECONDS)
delay += random.uniform(0, delay * 0.25)
time.sleep(delay)


def _request_with_retry(req: urllib.request.Request) -> dict:
"""Open a request and parse JSON, retrying on 429 and 5xx with backoff."""
last_error: urllib.error.HTTPError | None = None
for attempt in range(MAX_RETRIES):
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code not in RETRY_STATUSES or attempt == MAX_RETRIES - 1:
raise
last_error = e
retry_after = e.headers.get("Retry-After") if e.headers else None
print(
f" HTTP {e.code} on {req.full_url}, retrying "
f"(attempt {attempt + 1}/{MAX_RETRIES})...",
file=sys.stderr,
)
_sleep_for_retry(attempt, retry_after)
# Unreachable — final iteration either returns or re-raises.
assert last_error is not None
raise last_error


def api_get(path: str, api_key: str, params: dict | None = None) -> dict:
url = f"{BASE_URL}{path}"
if params:
Expand All @@ -55,8 +101,7 @@ def api_get(path: str, api_key: str, params: dict | None = None) -> dict:
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
})
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode())
return _request_with_retry(req)


def api_post(path: str, api_key: str, body: dict) -> dict:
Expand All @@ -67,8 +112,7 @@ def api_post(path: str, api_key: str, body: dict) -> dict:
"Content-Type": "application/json",
"Accept": "application/json",
})
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode())
return _request_with_retry(req)


def get_last_sync() -> str | None:
Expand All @@ -82,6 +126,23 @@ def set_last_sync(ts: str):
SYNC_FILE.write_text(ts + "\n")


def read_pending_fetch() -> list[str]:
"""Read recording IDs whose detail fetch failed on a prior run."""
if not PENDING_FETCH_FILE.exists():
return []
return [line.strip() for line in PENDING_FETCH_FILE.read_text().splitlines() if line.strip()]


def write_pending_fetch(ids: list[str]) -> None:
"""Persist recording IDs that still need to be fetched. Empties the file
when ids is empty so the next run has a clean slate."""
PENDING_FETCH_FILE.parent.mkdir(parents=True, exist_ok=True)
if ids:
PENDING_FETCH_FILE.write_text("\n".join(ids) + "\n")
elif PENDING_FETCH_FILE.exists():
PENDING_FETCH_FILE.unlink()


def list_recordings(api_key: str, start_date: str | None = None) -> list[dict]:
"""Fetch all recordings since start_date, handling pagination."""
all_recordings = []
Expand Down Expand Up @@ -289,14 +350,26 @@ def main():
start_date = last_sync[:10] if last_sync else None
recordings = list_recordings(api_key, start_date)

# Re-attempt any IDs that failed on prior runs, even if they fall outside
# the start_date window. The catalog returns metadata for these too, so we
# only need to ensure they're in the fetch loop — not duplicated.
pending_ids = read_pending_fetch()
seen_ids = {r.get("id") for r in recordings if r.get("id")}
missing_pending = [pid for pid in pending_ids if pid not in seen_ids]
if missing_pending:
print(f" Re-attempting {len(missing_pending)} pending fetch(es) from prior run(s)")
for pid in missing_pending:
recordings.append({"id": pid, "title": f"(pending: {pid})"})

if not recordings:
print(" No new recordings found.")
set_last_sync(datetime.now(timezone.utc).isoformat())
return

print(f" Found {len(recordings)} recording(s)")

# Filter out pending recordings (not yet processed by Pocket)
# Filter out pending recordings (not yet processed by Pocket).
# Stub entries from .pending-fetch have no "state" so they pass through.
ready = [r for r in recordings if r.get("state") != "pending"]
pending = len(recordings) - len(ready)
if pending:
Expand All @@ -306,6 +379,7 @@ def main():
# Fetch details and write each recording
deleted = read_deleted()
new_dirs = []
failed_ids: list[str] = []
for rec in recordings:
rec_id = rec.get("id")
if not rec_id:
Expand All @@ -320,13 +394,20 @@ def main():
new_dirs.append(dir_name)
except urllib.error.HTTPError as e:
print(f" ERROR fetching {rec_id}: {e}", file=sys.stderr)
failed_ids.append(rec_id)
continue

# Update sync timestamp
# Persist failures so the next run retries them regardless of watermark.
write_pending_fetch(failed_ids)

# Advance sync timestamp even on partial failure — pending-fetch is the
# safety net that keeps failed IDs in the next run's fetch list.
now = datetime.now(timezone.utc).isoformat()
set_last_sync(now)

print(f"\nDone. Pulled {len(new_dirs)} recording(s).")
if failed_ids:
print(f" {len(failed_ids)} fetch(es) failed; will retry next run.")
print(f"Sync timestamp: {now}")

# Write list of new dirs to stdout for the orchestration script
Expand Down
123 changes: 123 additions & 0 deletions scripts/seed-people.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python3
"""Seed .seam/people.json from existing analyses' speaker_maps.

Scans every .seam/analysis/*/analysis.json, collects unique speakers from
speaker_map, filters out generic role labels, and writes them to
.seam/people.json with source="inferred". Skips names that already exist
(case-insensitive) so it's safe to re-run.
"""
from __future__ import annotations

import json
import re
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path

ROOT = Path(__file__).resolve().parent.parent
ANALYSIS_DIR = ROOT / ".seam" / "analysis"
PEOPLE_FILE = ROOT / ".seam" / "people.json"

GENERIC_TOKENS = {
"unknown", "speaker", "narrator", "host", "facilitator", "moderator",
"chair", "manager", "lead", "engineer", "developer", "designer",
"analyst", "patient", "caller", "partner", "peer", "team", "member",
"physician", "doctor", "nurse", "guide", "tour", "father", "mother",
"parent", "child", "interviewer", "interviewee", "participant",
"guest", "attendee", "client", "customer", "user", "tech",
"platform", "analytics", "ela", "ent", "yc", "new",
GENERIC_LABELS = {
"unknown", "speaker", "narrator", "host", "facilitator", "moderator",
"chair", "participant", "interviewer", "interviewee",
"guest", "attendee", "caller", "member", "team",
}
Comment thread
yoaquim marked this conversation as resolved.

SPEAKER_NUM_RE = re.compile(r"^speaker\s*\d+$", re.IGNORECASE)


def is_generic(name: str) -> bool:
n = name.strip()
if not n or n.lower() == "unknown":
return True
# Trailing parenthetical: "Mark (Speaker 01)" -> strip and re-test base
base = re.sub(r"\s*\([^)]*\)\s*$", "", n).strip()
if base != n and not is_generic(base):
return False # has a real name before the paren
if SPEAKER_NUM_RE.match(n):
return True
# All tokens are generic role words → skip
tokens = [t.lower() for t in re.split(r"[\s\-/]+", n) if t]
if tokens and all(t in GENERIC_TOKENS or t.isdigit() for t in tokens):
return True
return False


def canonical(name: str) -> str:
"""Strip trailing parentheticals so 'Mark (Speaker 01)' merges with 'Mark'."""
return re.sub(r"\s*\([^)]*\)\s*$", "", name).strip()


def main() -> int:
if not ANALYSIS_DIR.exists():
print(f"No analysis directory at {ANALYSIS_DIR}", file=sys.stderr)
return 1

# Tally by canonical name → keep the longest variant as display name
# (e.g. prefer "Chris Woodson" over "Chris" when both appear).
variants: dict[str, dict[str, int]] = {}
for analysis_path in sorted(ANALYSIS_DIR.glob("*/analysis.json")):
try:
data = json.loads(analysis_path.read_text())
except (OSError, json.JSONDecodeError):
continue
speaker_map = data.get("speaker_map") or {}
for raw in speaker_map.values():
if not isinstance(raw, str):
continue
name = raw.strip()
if is_generic(name):
continue
key = canonical(name).lower()
if not key:
continue
variants.setdefault(key, {})[name] = variants.setdefault(key, {}).get(name, 0) + 1

# Pick a display name per canonical key: most-used, breaking ties by length.
chosen: list[str] = []
for key, counts in variants.items():
best = sorted(counts.items(), key=lambda kv: (-kv[1], -len(kv[0])))[0][0]
chosen.append(best)
chosen.sort()

# Load existing people, skip duplicates by name (case-insensitive).
existing: list[dict] = []
if PEOPLE_FILE.exists():
try:
existing = json.loads(PEOPLE_FILE.read_text()).get("people", [])
except (OSError, json.JSONDecodeError):
existing = []
existing_names = {p["name"].lower() for p in existing if "name" in p}

now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
added = 0
for name in chosen:
if name.lower() in existing_names:
continue
existing.append({
"id": str(uuid.uuid4()),
"name": name,
"source": "inferred",
"createdAt": now,
})
existing_names.add(name.lower())
added += 1

PEOPLE_FILE.parent.mkdir(parents=True, exist_ok=True)
PEOPLE_FILE.write_text(json.dumps({"people": existing}, indent=2) + "\n")
print(f"Seeded {added} new person(s) (total: {len(existing)}) -> {PEOPLE_FILE}")
return 0


if __name__ == "__main__":
sys.exit(main())
Loading
Loading