From 85aa2fc2c9a67b73ab0cf4eb468a2359c0178591 Mon Sep 17 00:00:00 2001 From: jayde Date: Wed, 22 Apr 2026 23:09:50 +0800 Subject: [PATCH] fix: add retry logic and checkpoint support for filter command --- kompile/cli.py | 23 ++++++++++++++++------- kompile/compiler/filter.py | 38 +++++++++++++++++++++++++------------- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/kompile/cli.py b/kompile/cli.py index 01a84be..675c385 100644 --- a/kompile/cli.py +++ b/kompile/cli.py @@ -137,7 +137,7 @@ def filter_cmd(ctx): click.echo(f"Filtering {len(unfiltered)} sources with {model}...") results = [] - from kompile.models import Source + from kompile.models import Source, FilterResult for i, sid in enumerate(sorted(unfiltered), 1): src_file = raw_dir / f"{sid}.txt" if not src_file.exists(): @@ -155,12 +155,17 @@ def filter_cmd(ctx): metadata=src_data.get("metadata", {}), ) - result = filter_source(source, client, model) + try: + result = filter_source(source, client, model) + except ValueError as exc: + click.echo(f" [{i}/{len(unfiltered)}] {source.title[:60]} → ✗ skip (parse error: {exc})", err=True) + result = FilterResult(source_id=source.id, keep=False, topics=[], summary=f"[filter error: {exc}]") results.append(result) + state_add_filter_results(state, [result]) + save_state(root, state) status = "✓ keep" if result.keep else "✗ discard" click.echo(f" [{i}/{len(unfiltered)}] {source.title[:60]} → {status} | {result.summary[:80]}") - state_add_filter_results(state, results) kept = sum(1 for r in results if r.keep) click.echo(f"\nFiltered {len(results)} sources: {kept} kept, {len(results)-kept} discarded.") @@ -182,7 +187,7 @@ def filter_cmd(ctx): @click.pass_context def compile(ctx, incremental): """Run full tiered compilation pipeline → wiki/ directory.""" - from kompile.models import Source, TieredWiki, SurfaceNote, Concept, Insight, Gap + from kompile.models import Source, TieredWiki, SurfaceNote, Concept, Insight, Gap, FilterResult from kompile.compiler.filter import filter_source from kompile.compiler.classify import classify_topics from kompile.compiler.summarize import summarize_source @@ -219,12 +224,16 @@ def compile(ctx, incremental): url=src_data.get("metadata", {}).get("url"), metadata=src_data.get("metadata", {}), ) - result = filter_source(source, client, cfg["models"]["filter"]) + try: + result = filter_source(source, client, cfg["models"]["filter"]) + except ValueError as exc: + click.echo(f" ✗ skip {source.title[:60]} (parse error: {exc})", err=True) + result = FilterResult(source_id=source.id, keep=False, topics=[], summary=f"[filter error: {exc}]") results.append(result) + state_add_filter_results(state, [result]) + save_state(root, state) status = "✓" if result.keep else "✗" click.echo(f" {status} {source.title[:60]}") - state_add_filter_results(state, results) - save_state(root, state) # ------------------------------------------------------------------ # # Step 2: Classify topics (if not already done or new sources added) diff --git a/kompile/compiler/filter.py b/kompile/compiler/filter.py index 8ffb1db..0d0dd4a 100644 --- a/kompile/compiler/filter.py +++ b/kompile/compiler/filter.py @@ -26,19 +26,28 @@ def filter_source( date=source.date, content=content, ) - response = client.messages.create( - model=model, - max_tokens=1024, - system=FILTER_SYSTEM, - messages=[{"role": "user", "content": user_msg}], - ) - data = parse_llm_json(response.content[0].text) - return FilterResult( - source_id=source.id, - keep=bool(data.get("keep", False)), - topics=data.get("topics", []), - summary=data.get("summary", ""), - ) + last_exc: Exception | None = None + for attempt in range(3): + response = client.messages.create( + model=model, + max_tokens=1024, + system=FILTER_SYSTEM, + messages=[{"role": "user", "content": user_msg}], + ) + try: + data = parse_llm_json(response.content[0].text) + if not isinstance(data, dict): + raise ValueError(f"expected dict, got {type(data).__name__}") + except ValueError as exc: + last_exc = exc + continue + return FilterResult( + source_id=source.id, + keep=bool(data.get("keep", False)), + topics=data.get("topics", []), + summary=data.get("summary", ""), + ) + raise ValueError(f"filter_source failed after 3 attempts for {source.id!r}: {last_exc}") def filter_sources( @@ -46,11 +55,14 @@ def filter_sources( client: anthropic.Anthropic, model: str, progress_cb: Callable[[int, int, FilterResult], None] | None = None, + checkpoint_cb: Callable[[FilterResult], None] | None = None, ) -> list[FilterResult]: results = [] for i, source in enumerate(sources): result = filter_source(source, client, model) results.append(result) + if checkpoint_cb: + checkpoint_cb(result) if progress_cb: progress_cb(i + 1, len(sources), result) return results