diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..150c045 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,21 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 + with: + node-version: 22 + cache: npm + - run: npm ci + - run: npm run typecheck + - run: npm test + - run: npm run build:skill diff --git a/.gitignore b/.gitignore index a63cb11..8badcf9 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ dist/ .quorum/ .DS_Store *.log +__pycache__/ +*.pyc diff --git a/package.json b/package.json index 7fa4abc..5fc60ec 100644 --- a/package.json +++ b/package.json @@ -1,17 +1,28 @@ { "name": "quorum", "version": "0.2.0", - "private": true, "type": "module", "description": "Quorum review synthesis plus Cursor Cloud exploration DAG runner.", "bin": { "quorum": "./dist/src/cli.js", "quorum-cloud": "./dist/src/cli.js" }, + "files": [ + "dist/src/", + "scripts/", + "references/", + "SKILL.md", + "quorum.skill", + "LICENSE", + "README.md" + ], "scripts": { + "prebuild": "rm -rf dist", "build": "tsc -p tsconfig.json", + "build:skill": "rm -f quorum.skill && mkdir -p .skill-build/quorum && cp SKILL.md .skill-build/quorum/ && cp -r scripts references .skill-build/quorum/ && cd .skill-build && zip -r ../quorum.skill quorum/ && cd .. && rm -rf .skill-build && echo 'quorum.skill rebuilt'", "test": "npm run build && node --test dist/test/*.test.js", - "typecheck": "tsc -p tsconfig.json --noEmit" + "typecheck": "tsc -p tsconfig.json --noEmit", + "prepublishOnly": "npm run build && npm run build:skill && npm test" }, "engines": { "node": ">=22" diff --git a/quorum.skill b/quorum.skill index ffaa01f..836cdc6 100644 Binary files a/quorum.skill and b/quorum.skill differ diff --git a/references/clustering-rubric.md b/references/clustering-rubric.md new file mode 100644 index 0000000..4f3581d --- /dev/null +++ b/references/clustering-rubric.md @@ -0,0 +1,91 @@ +# Clustering rubric + +You are deduplicating findings from automated code reviewers (Bugbot, Copilot, Devin, …) on a single pull request. Partition **all** findings into clusters, where each cluster represents exactly one underlying issue. You are the judge; read this file in full, then write `clusters.json`. + +Two role boundaries, non-negotiable: + +- **You match; you do not re-review.** Never drop, merge away, or down-rank a finding because you think it's wrong. Validity gets measured later, from fix data. +- **You do not compute quorum.** That happens deterministically in `validate_partition.py`. + +## The test + +Two findings belong in the same cluster **if and only if a single code change would plausibly resolve both.** Same root cause, same fix → same cluster. + +### MERGE when findings describe the same defect even if they are: + +- anchored to different lines — one flags where the bug originates, another where it manifests downstream +- worded very differently, or given different severities by their reviewers +- at different abstraction levels: one general ("no error handling in this function"), one a specific instance ("this await can reject unhandled") → merge with `match_type: "general-specific"` +- from the **same** reviewer — bots duplicate themselves across re-review passes → `match_type: "within-reviewer-dup"` + +### DO NOT MERGE when: + +- findings are the same *category* of bug at independent locations requiring independent fixes — two separate missing null checks are two clusters +- different issues happen to anchor on the same line +- one is functional and the other purely stylistic, even if co-located +- **you are uncertain.** A wrong merge fabricates reviewer consensus, which is worse than leaving a duplicate. Only merge when the single-fix test clearly passes. + +Cross-file merges are allowed **only** when the root cause is literally shared (e.g. both findings trace to the same mutated shared constant). Set `"cross_file": true` and justify in `match_rationale`. + +Reviewer identity is irrelevant to whether two findings match. Use the `hunk` diff context to judge — comment prose alone is often too vague to tell whether two descriptions point at the same defect. + +## Output schema + +Write strict JSON to `clusters.json`. Hard constraints: + +- Every input finding `id` appears in **exactly one** cluster. No omissions, no duplicates, no invented ids. +- Singleton clusters are expected and fine — most clusters will be singletons. +- `canonical_title`: ≤ 80 chars, names the defect, not the symptom. +- `canonical_description`: 1–3 sentences synthesizing the **union** of information across members — if one reviewer adds detail the others missed, keep it. +- `category`: one of `logic | concurrency | security | performance | error-handling | data-integrity | api-contract | style | docs | test-gap | other` +- `severity`: max across members, one of `critical | major | minor | nit` +- `match_type`: `exact | same-root-cause | general-specific | within-reviewer-dup | singleton` +- `match_confidence`: 0.0–1.0; use 1.0 for singletons. Multi-finding clusters below 0.7 will be split back into singletons by the validator — that gate is intentional, do not inflate confidence to dodge it. +- `match_rationale`: one sentence, required for clusters of size > 1. + +```json +{ + "clusters": [ + { + "cluster_id": "c1", + "member_ids": ["bugbot-3", "devin-1"], + "canonical_title": "string", + "canonical_description": "string", + "category": "logic", + "severity": "major", + "primary_location": {"file": "string", "start_line": 0, "end_line": 0}, + "match_type": "same-root-cause", + "match_confidence": 0.9, + "match_rationale": "one sentence; required for size > 1", + "cross_file": false + } + ] +} +``` + +`primary_location` is where a human should look first — usually the origin of the defect, not a downstream symptom. + +## Worked examples + +**A — merge across different lines (same-root-cause):** + +- `bugbot-2` @ `utils/options.ts:14` — "Object.assign(DEFAULT_OPTIONS, userOpts) mutates the shared default object; later callers inherit this user's prefs." +- `devin-4` @ `routes/trip.ts:88` — "Route preferences appear to leak between requests; defaults polluted by prior calls." + +→ One fix (clone before assign) resolves both. **MERGE**, primary_location at the mutation site. + +**B — do not merge (same category, independent instances):** + +- `copilot-1` @ `api/users.ts:42` — missing null check on `req.user` +- `bugbot-5` @ `api/orders.ts:17` — missing null check on `order.customer` + +→ Independent fixes. **TWO clusters.** + +**C — merge general + specific:** + +- `devin-2` @ `services/sync.ts` (function-level) — "No error handling in syncAll; any failure leaves partial state." +- `bugbot-7` @ `services/sync.ts:103` — "await push() can reject and is unhandled." + +→ The specific is an instance of the general. **MERGE**, `match_type: "general-specific"`; the description covers the broad gap and cites line 103 as a concrete instance. + + diff --git a/scripts/fetch_findings.sh b/scripts/fetch_findings.sh new file mode 100755 index 0000000..ec8eda5 --- /dev/null +++ b/scripts/fetch_findings.sh @@ -0,0 +1,81 @@ +#!/usr/bin/env bash +# fetch_findings.sh — pull line-anchored review comments from a PR, keep the +# ones authored by review bots, and normalize them into Quorum finding records. +# +# usage: fetch_findings.sh OWNER/REPO PR_NUMBER [OUT=findings.json] +# env: QUORUM_BOTS case-insensitive regex matched against author login +# (default: 'cursor\[bot\]|copilot|devin') +# +# Output record: +# { id, reviewer, login, file, lines:[start,end], outdated, +# body, hunk, url, comment_id, node_id } +set -euo pipefail + +usage() { echo "usage: fetch_findings.sh OWNER/REPO PR_NUMBER [out.json]" >&2; exit 2; } +[[ $# -ge 2 ]] || usage + +REPO=$1 +PR=$2 +OUT=${3:-findings.json} +BOTS_RE=${QUORUM_BOTS:-'cursor\[bot\]|copilot|devin'} + +command -v gh >/dev/null || { echo "error: gh CLI not found" >&2; exit 1; } +command -v jq >/dev/null || { echo "error: jq not found" >&2; exit 1; } + +RAW=$(mktemp) +trap 'rm -f "$RAW"' EXIT + +# --paginate emits one JSON document per page; --jq '.[]' flattens to a +# stream of comment objects, jq -s reassembles a single array. +gh api "repos/$REPO/pulls/$PR/comments" --paginate --jq '.[]' | jq -s '.' > "$RAW" + +TOTAL=$(jq 'length' "$RAW") +AUTHORS=$(jq -r '[.[].user.login] | unique | join(", ")' "$RAW") + +jq --arg re "$BOTS_RE" ' + def short(l): (l | ascii_downcase) as $x + | if ($x | test("cursor")) then "bugbot" + elif ($x | test("copilot")) then "copilot" + elif ($x | test("devin")) then "devin" + else ($x | gsub("\\[bot\\]$"; "") | gsub("[^a-z0-9]+"; "-")) + end; + + [ .[] | select(.user.login | test($re; "i")) ] + | sort_by(.path, (.line // .original_line // 0)) + | group_by(.user.login) + | map( + to_entries + | map( + .value as $c + | { + id: (short($c.user.login) + "-" + ((.key + 1) | tostring)), + reviewer: short($c.user.login), + login: $c.user.login, + file: $c.path, + lines: [ + ($c.start_line // $c.original_start_line // $c.line // $c.original_line), + ($c.line // $c.original_line // $c.start_line // $c.original_start_line) + ], + outdated: ($c.line == null), + body: $c.body, + hunk: ($c.diff_hunk // ""), + url: $c.html_url, + comment_id: $c.id, + node_id: $c.node_id + } + ) + ) + | add // [] + | sort_by(.file, (.lines[0] // 0)) +' "$RAW" > "$OUT" + +N=$(jq 'length' "$OUT") +echo "PR #$PR: $TOTAL review comment(s) total; authors seen: ${AUTHORS:-none}" >&2 +echo "Matched $N bot finding(s) -> $OUT" >&2 +jq -r 'group_by(.reviewer)[] | " \(.[0].reviewer): \(length)"' "$OUT" >&2 || true + +if [[ "$N" -eq 0 ]]; then + echo "" >&2 + echo "No findings matched filter '$BOTS_RE'." >&2 + echo "If review bots did comment, set QUORUM_BOTS to match the author logins listed above and re-run." >&2 +fi diff --git a/scripts/post_synthesis.py b/scripts/post_synthesis.py new file mode 100755 index 0000000..08dcb7e --- /dev/null +++ b/scripts/post_synthesis.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +"""post_synthesis.py — render and post the Quorum synthesis comment. + +- Upserts ONE issue comment on the PR, idempotent via a hidden HTML marker: + re-runs update the same comment in place, never spam. +- Adds eyes reactions to the original bot comments belonging to quorum >= 2 + clusters (the in-thread cue that another reviewer agrees). +- Embeds the scored clusters JSON in a collapsed
block — the + machine-readable surface for downstream agents. +- --minimize collapses non-primary members of multi-finding clusters as + DUPLICATE (hides other bots' comments: opt-in, ask the user first). +- --dry-run renders to stdout and lists planned side effects, no gh calls. + +usage: post_synthesis.py OWNER/REPO PR_NUMBER clusters.scored.json + [--dry-run] [--minimize] [--no-reactions] +""" + +import argparse +import json +import subprocess +import sys + +MARKER = "" +MAX_BODY = 60000 # GitHub comment hard limit is 65536 + +TIER_EMOJI_FULL = "\U0001F3AF" # direct hit +TIER_EMOJI_MULTI = "\u26A1" # high voltage +TIER_EMOJI_SOLO = "\u25FD" # small square + + +def gh(*args, payload=None): + """Run a gh command; returns stdout. Raises CalledProcessError on failure.""" + res = subprocess.run( + ["gh", *args], + check=True, capture_output=True, text=True, + input=payload, + ) + return res.stdout + + +def tier_emoji(quorum, denominator): + if quorum >= 2 and quorum == denominator: + return TIER_EMOJI_FULL + if quorum >= 2: + return TIER_EMOJI_MULTI + return TIER_EMOJI_SOLO + + +def loc_str(c): + loc = c.get("primary_location") or {} + f = loc.get("file") or "?" + a, b = loc.get("start_line"), loc.get("end_line") + if a and b and a != b: + return f"`{f}:L{a}-L{b}`" + if a or b: + return f"`{f}:L{a or b}`" + return f"`{f}`" + + +def render(repo, pr, data, include_json=True): + totals = data["totals"] + d = max(totals["reviewer_denominator"], 1) + clusters = data["clusters"] + reviewers = sorted({r for c in clusters for r in c["reviewers"]}) + + out = [MARKER, "## \U0001F91D Quorum — review synthesis", ""] + out.append( + f"**{totals['findings']} findings** from {', '.join(reviewers) or 'no reviewers'} " + f"\u2192 **{totals['clusters']} distinct issues**. " + f"Sorted by reviewer quorum, then severity." + ) + out.append("") + + current_q = None + for c in clusters: + q = c["quorum"] + if q != current_q: + current_q = q + label = "reviewers agree" if q > 1 else "reviewer" + if out[-1] != "": + out.append("") + out.append(f"### {tier_emoji(q, d)} {q}/{d} {label}") + out.append("") + members = ", ".join(f"[{m['id']}]({m['url']})" for m in c["members"]) + out.append( + f"- **[{c['severity']}]** {c['canonical_title']} — {loc_str(c)} · " + f"{c['category']} · {members}" + ) + desc = (c.get("canonical_description") or "").strip() + if desc and desc != c["canonical_title"]: + out.append(f" {desc}") + if c.get("gate_split_from"): + out.append(f" split from low-confidence merge `{c['gate_split_from']}`") + out.append("") + + if totals.get("gate_split"): + out.append( + f"Confidence gate split {len(totals['gate_split'])} proposed merge(s) " + f"back into singletons \u2014 quorum is shown only where the match is confident." + ) + out.append("") + + if include_json: + out.append("
clusters.scored.json (machine-readable, for agents)") + out.append("") + out.append("```json") + out.append(json.dumps(data, indent=2)) + out.append("```") + out.append("
") + out.append("") + + out.append(f"Quorum · PR {repo}#{pr} · generated {data.get('generated_at', '')}") + return "\n".join(out) + + +def find_existing_comment(repo, pr): + raw = gh( + "api", f"repos/{repo}/issues/{pr}/comments", "--paginate", + "--jq", '.[] | {id: .id, hit: (.body | contains("quorum:synthesis"))} | @json', + ) + for line in raw.splitlines(): + if not line.strip(): + continue + obj = json.loads(line) + if obj.get("hit"): + return obj["id"] + return None + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("repo", help="OWNER/REPO") + ap.add_argument("pr", help="PR number") + ap.add_argument("scored", help="clusters.scored.json from validate_partition.py") + ap.add_argument("--dry-run", action="store_true") + ap.add_argument("--minimize", action="store_true") + ap.add_argument("--no-reactions", action="store_true") + args = ap.parse_args() + + with open(args.scored) as f: + data = json.load(f) + clusters = data["clusters"] + + body = render(args.repo, args.pr, data) + if len(body) > MAX_BODY: + body = render(args.repo, args.pr, data, include_json=False) + body += "\nJSON payload omitted: comment size limit. See workflow artifacts/logs." + + react_targets = [ + m for c in clusters if c["quorum"] >= 2 for m in c["members"] + ] + minimize_targets = [ + m for c in clusters if len(c["members"]) > 1 for m in c["members"][1:] + ] + + if args.dry_run: + print(body) + print("\n--- planned side effects (dry run) ---", file=sys.stderr) + print(f"upsert synthesis comment on {args.repo}#{args.pr}", file=sys.stderr) + if not args.no_reactions: + print(f"eyes reactions on {len(react_targets)} comment(s): " + f"{[m['id'] for m in react_targets]}", file=sys.stderr) + if args.minimize: + print(f"minimize as DUPLICATE: {[m['id'] for m in minimize_targets]}", file=sys.stderr) + return + + # 1. Upsert synthesis comment + payload = json.dumps({"body": body}) + existing = find_existing_comment(args.repo, args.pr) + if existing: + gh("api", "-X", "PATCH", f"repos/{args.repo}/issues/comments/{existing}", + "--input", "-", payload=payload) + print(f"updated synthesis comment {existing}") + else: + gh("api", "-X", "POST", f"repos/{args.repo}/issues/{args.pr}/comments", + "--input", "-", payload=payload) + print("posted new synthesis comment") + + # 2. Reactions on quorum >= 2 members (best-effort) + if not args.no_reactions: + ok = 0 + for m in react_targets: + try: + gh("api", "-X", "POST", + f"repos/{args.repo}/pulls/comments/{m['comment_id']}/reactions", + "-f", "content=eyes") + ok += 1 + except subprocess.CalledProcessError: + pass + print(f"reactions: {ok}/{len(react_targets)}") + + # 3. Optional minimize of duplicate members (best-effort) + if args.minimize: + mutation = ( + "mutation($id: ID!) { minimizeComment(input: {subjectId: $id, " + "classifier: DUPLICATE}) { minimizedComment { isMinimized } } }" + ) + ok = 0 + for m in minimize_targets: + if not m.get("node_id"): + continue + try: + gh("api", "graphql", "-f", f"query={mutation}", "-f", f"id={m['node_id']}") + ok += 1 + except subprocess.CalledProcessError: + pass + print(f"minimized: {ok}/{len(minimize_targets)}") + + +if __name__ == "__main__": + main() diff --git a/scripts/validate_partition.py b/scripts/validate_partition.py new file mode 100755 index 0000000..627907d --- /dev/null +++ b/scripts/validate_partition.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +"""validate_partition.py — the deterministic harness around the clustering judge. + +1. Validates clusters.json against findings.json: + - member_ids form an EXACT partition of finding ids + (no omissions, no duplicates, no invented ids) + - schema sanity: required fields, enum values, confidence in [0,1], + rationale present on multi-finding clusters + On violation: prints each named violation and exits 1 so the agent can fix + clusters.json and retry. + +2. Applies the confidence gate: multi-finding clusters with + match_confidence < threshold (default 0.7) are split back into singletons. + A false merge fabricates reviewer consensus; a missed merge is just noise. + +3. Computes quorum per cluster = count of DISTINCT reviewers among members + (within-reviewer dups collapse without inflating consensus), sorts by + (quorum desc, severity desc), and writes clusters.scored.json. + +usage: validate_partition.py findings.json clusters.json [-o clusters.scored.json] + [--confidence-gate 0.7] [--no-gate] +""" + +import argparse +import datetime +import json +import sys + +SEVERITIES = {"critical": 3, "major": 2, "minor": 1, "nit": 0} +CATEGORIES = { + "logic", "concurrency", "security", "performance", "error-handling", + "data-integrity", "api-contract", "style", "docs", "test-gap", "other", +} +MATCH_TYPES = { + "exact", "same-root-cause", "general-specific", + "within-reviewer-dup", "singleton", +} + + +def load(path, label): + try: + with open(path) as f: + return json.load(f) + except (OSError, json.JSONDecodeError) as e: + print(f"error: could not read {label} from {path}: {e}", file=sys.stderr) + sys.exit(1) + + +def first_line(text, limit=80): + for line in (text or "").strip().splitlines(): + line = line.strip().lstrip("#*-> ").strip() + if line: + return line[:limit] + return "(no comment text)" + + +def singleton_from(finding, parent_id=None, parent_severity=None): + lines = finding.get("lines") or [None, None] + c = { + "cluster_id": f"{finding['id']}-solo", + "member_ids": [finding["id"]], + "canonical_title": first_line(finding.get("body")), + "canonical_description": first_line(finding.get("body"), 240), + "category": "other", + # Preserve the parent cluster's severity when the confidence gate splits + # it, so a low-confidence cluster does not silently downgrade a critical + # finding to "minor". Genuine standalone singletons fall back to "minor". + "severity": parent_severity or "minor", + "primary_location": { + "file": finding.get("file"), + "start_line": lines[0], + "end_line": lines[1], + }, + "match_type": "singleton", + "match_confidence": 1.0, + "cross_file": False, + } + if parent_id: + c["gate_split_from"] = parent_id + return c + + +def validate(findings, doc): + errors = [] + if not isinstance(doc, dict) or not isinstance(doc.get("clusters"), list): + return ['clusters.json must be an object with a "clusters" array'], [] + + clusters = doc["clusters"] + fmap = {f["id"]: f for f in findings} + seen = {} + + for i, c in enumerate(clusters): + cid = c.get("cluster_id") or f"" + mids = c.get("member_ids") or [] + if not mids: + errors.append(f"{cid}: empty member_ids") + for m in mids: + if m not in fmap: + errors.append(f"{cid}: invented id '{m}' (not in findings.json)") + elif m in seen: + errors.append(f"finding '{m}' appears in both '{seen[m]}' and '{cid}'") + else: + seen[m] = cid + + for field in ("cluster_id", "canonical_title", "category", "severity", "match_type"): + if not c.get(field): + errors.append(f"{cid}: missing required field '{field}'") + if c.get("severity") and c["severity"] not in SEVERITIES: + errors.append(f"{cid}: severity '{c['severity']}' not in {sorted(SEVERITIES)}") + if c.get("category") and c["category"] not in CATEGORIES: + errors.append(f"{cid}: category '{c['category']}' not in {sorted(CATEGORIES)}") + if c.get("match_type") and c["match_type"] not in MATCH_TYPES: + errors.append(f"{cid}: match_type '{c['match_type']}' not in {sorted(MATCH_TYPES)}") + + conf = c.get("match_confidence") + if not isinstance(conf, (int, float)) or not (0.0 <= float(conf) <= 1.0): + errors.append(f"{cid}: match_confidence must be a number in [0,1], got {conf!r}") + if len(mids) > 1 and not (c.get("match_rationale") or "").strip(): + errors.append(f"{cid}: match_rationale is required for clusters of size > 1") + + omitted = [fid for fid in fmap if fid not in seen] + for fid in omitted: + errors.append(f"finding '{fid}' omitted from all clusters") + + return errors, clusters + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("findings") + ap.add_argument("clusters") + ap.add_argument("-o", "--out", default="clusters.scored.json") + ap.add_argument("--confidence-gate", type=float, default=0.7) + ap.add_argument("--no-gate", action="store_true") + args = ap.parse_args() + + findings = load(args.findings, "findings") + doc = load(args.clusters, "clusters") + fmap = {f["id"]: f for f in findings} + + errors, clusters = validate(findings, doc) + if errors: + print("VALIDATION FAILED — fix these in clusters.json and re-run:", file=sys.stderr) + for e in errors: + print(f" - {e}", file=sys.stderr) + sys.exit(1) + + # Confidence gate + gated, out_clusters = [], [] + for c in clusters: + if ( + not args.no_gate + and len(c["member_ids"]) > 1 + and float(c["match_confidence"]) < args.confidence_gate + ): + gated.append(c["cluster_id"]) + out_clusters.extend( + singleton_from(fmap[m], c["cluster_id"], c.get("severity")) + for m in c["member_ids"] + ) + else: + out_clusters.append(dict(c)) + + # Deterministic quorum + enrichment + for c in out_clusters: + reviewers = sorted({fmap[m]["reviewer"] for m in c["member_ids"]}) + c["quorum"] = len(reviewers) + c["reviewers"] = reviewers + c["members"] = [ + {k: fmap[m].get(k) for k in + ("id", "reviewer", "file", "lines", "url", "comment_id", "node_id", "outdated")} + for m in c["member_ids"] + ] + + denominator = len({f["reviewer"] for f in findings}) + out_clusters.sort(key=lambda c: ( + -c["quorum"], + -SEVERITIES[c["severity"]], + (c.get("primary_location") or {}).get("file") or "", + )) + + result = { + "generated_at": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds"), + "totals": { + "findings": len(findings), + "clusters": len(out_clusters), + "reviewer_denominator": denominator, + "gate_split": gated, + }, + "clusters": out_clusters, + } + with open(args.out, "w") as f: + json.dump(result, f, indent=2) + + tiers = {} + for c in out_clusters: + tiers[c["quorum"]] = tiers.get(c["quorum"], 0) + 1 + print(f"OK: {len(findings)} findings -> {len(out_clusters)} clusters " + f"(denominator: {denominator} reviewers)") + for q in sorted(tiers, reverse=True): + print(f" quorum {q}/{denominator}: {tiers[q]} cluster(s)") + if gated: + print(f" confidence gate split: {', '.join(gated)}") + print(f"wrote {args.out}") + + +if __name__ == "__main__": + main() diff --git a/src/adapters/anthropic.ts b/src/adapters/anthropic.ts new file mode 100644 index 0000000..380ef3c --- /dev/null +++ b/src/adapters/anthropic.ts @@ -0,0 +1,95 @@ +import { withRetry } from "../retry.js"; +import type { TaskExecutionInput, TaskExecutionResult, TaskRunnerAdapter } from "../types.js"; + +const API_BASE = process.env.ANTHROPIC_BASE_URL || "https://api.anthropic.com"; + +/** + * Task runner that calls the Anthropic Messages API directly. + * Set ANTHROPIC_API_KEY to use this adapter. + */ +export class AnthropicAdapter implements TaskRunnerAdapter { + constructor(private options: { maxRetries?: number } = {}) {} + + async runTask(input: TaskExecutionInput): Promise { + return withRetry(() => this.runOnce(input), { + maxRetries: this.options.maxRetries, + signal: input.signal, + label: `Task ${input.task.id}`, + }); + } + + private async runOnce(input: TaskExecutionInput): Promise { + const apiKey = input.apiKey || process.env.ANTHROPIC_API_KEY; + if (!apiKey) { + throw new Error("ANTHROPIC_API_KEY is required for the Anthropic adapter."); + } + + const started = Date.now(); + const systemPrompt = [ + "You are a read-only PR review exploration agent for Quorum.", + "Do not suggest edits, create commits, push branches, or open pull requests.", + "Return a concise human-readable explanation, then end with exactly one fenced JSON block as specified.", + ].join("\n"); + + let responseText = ""; + let assistantOutput = ""; + + try { + const response = await fetch(`${API_BASE}/v1/messages`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-api-key": apiKey, + "anthropic-version": "2023-06-01", + "anthropic-dangerous-direct-browser-access": "true", + }, + signal: input.signal, + body: JSON.stringify({ + model: input.model, + max_tokens: 4096, + system: systemPrompt, + messages: [{ role: "user", content: input.prompt }], + }), + }); + + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error(`Anthropic API error (${response.status}): ${text}`); + } + + const body = (await response.json()) as { + content?: Array<{ type: string; text?: string }>; + stop_reason?: string; + id?: string; + }; + + // Guard against a malformed success response that omits/nulls content; + // an empty result is recorded downstream as a parseError, not a throw. + const blocks = Array.isArray(body.content) ? body.content : []; + for (const block of blocks) { + if (block.type === "text" && block.text) { + assistantOutput += block.text; + } + } + responseText = assistantOutput; + + if (body.stop_reason === "max_tokens") { + console.error(`Task ${input.task.id}: response hit max_tokens limit`); + } + } catch (error) { + if (input.signal.aborted) { + throw new Error("Task aborted."); + } + throw error; + } + + // Parse validation is owned by the runner (see applyResult): a finished-but- + // unparseable response is recorded as a parseError downstream, not a task + // error. The adapter only reports that the model produced a response. + return { + status: "finished", + resultText: responseText, + durationMs: Date.now() - started, + }; + } +} diff --git a/src/cli.ts b/src/cli.ts index ac54ea9..8517a4e 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -1,6 +1,9 @@ #!/usr/bin/env node +import { execFile } from "node:child_process"; import { mkdir, readFile, writeFile } from "node:fs/promises"; import { basename, dirname, join } from "node:path"; +import { promisify } from "node:util"; +import { AnthropicAdapter } from "./adapters/anthropic.js"; import { canvasFileNameFor, canvasPathFromOutDir, @@ -11,12 +14,23 @@ import { parseClusterIds, parseScoredClusters, selectClusters } from "./clusters import { CursorCloudAdapter } from "./cursor-cloud-adapter.js"; import { parseDag } from "./dag.js"; import { upsertExplorationComment } from "./github.js"; +import { loadRunLogs } from "./logging.js"; +import type { LogEntry } from "./logging.js"; +import { scriptsDirFromModuleUrl } from "./paths.js"; import { parsePullRequestRef, repoSlug } from "./pr.js"; import { buildExplorationDag } from "./prompts.js"; import { buildExplorationReport, renderExplorationMarkdown } from "./report.js"; import { initialRunState, runDag, writeState } from "./runner.js"; import { recoverScoredClustersFromPullRequest } from "./synthesis.js"; -import type { Dag, ExplorationContext, RunnerContext, ScoredClustersDoc } from "./types.js"; +import type { + Dag, + ExplorationContext, + RunnerContext, + ScoredClustersDoc, + TaskRunnerAdapter, +} from "./types.js"; + +const execFileAsync = promisify(execFile); interface ParsedArgs { command?: string; @@ -68,6 +82,22 @@ async function main(): Promise { await canvasCommand(parsed); return; } + if (parsed.command === "synthesize") { + await synthesizeCommand(parsed); + return; + } + if (parsed.command === "triage-pr") { + await triagePrCommand(parsed); + return; + } + if (parsed.command === "setup") { + await setupCommand(); + return; + } + if (parsed.command === "eval") { + await evalCommand(parsed); + return; + } throw new Error(`Unknown command: ${parsed.command}`); } @@ -118,7 +148,7 @@ async function executeExplore(options: ExploreExecutionOptions): Promise { await writeJson(join(outDir, "input.clusters.scored.json"), scoredDoc); await writeJson(join(outDir, "dag.json"), dag); - let state = initialRunState(dag); + let state = initialRunState(dag, resolveProvider(parsed)); const canvasPath = canvasPathFor(parsed, outDir); const canvasMirrorPath = canvasMirrorPathFor(parsed, canvasPath, repo, pr); if (hasFlag(parsed, "plan-only")) { @@ -127,7 +157,7 @@ async function executeExplore(options: ExploreExecutionOptions): Promise { state = await runDag( dag, runnerContext(parsed, repo, pr, repoUrl, prUrl, outDir, canvasPath, canvasMirrorPath), - new CursorCloudAdapter(), + resolveAdapter(parsed), ); } @@ -173,11 +203,11 @@ async function runDagCommand(parsed: ParsedArgs): Promise { const canvasMirrorPath = canvasMirrorPathFor(parsed, canvasPath, repo, pr); const state = hasFlag(parsed, "plan-only") - ? initialRunState(dag) + ? initialRunState(dag, resolveProvider(parsed)) : await runDag( dag, runnerContext(parsed, repo, pr, repoUrl, prUrl, outDir, canvasPath, canvasMirrorPath), - new CursorCloudAdapter(), + resolveAdapter(parsed), ); if (hasFlag(parsed, "plan-only")) await writeState(outDir, state, canvasPath, canvasMirrorPath); logCanvasPaths(canvasPath, canvasMirrorPath); @@ -229,6 +259,9 @@ function runnerContext( canvasPath: string | false, canvasMirrorPath: string | undefined, ): RunnerContext { + const provider = resolveProvider(parsed); + const apiKey = flag(parsed, "api-key") + ?? (provider === "anthropic" ? process.env.ANTHROPIC_API_KEY : process.env.CURSOR_API_KEY); return { repo, pr, @@ -237,13 +270,29 @@ function runnerContext( outDir, canvasPath, canvasMirrorPath, - apiKey: flag(parsed, "api-key") ?? process.env.CURSOR_API_KEY, + apiKey, + // Default to 4 simultaneous tasks for both providers. Cursor Cloud plans may + // cap concurrent agents; if that cap is hit, the adapter surfaces an + // actionable error (see describeCursorError) telling the user to lower + // --concurrency or upgrade their plan. concurrency: numberFlag(parsed, "concurrency", 4), taskTimeoutMs: numberFlag(parsed, "task-timeout-ms", 20 * 60 * 1000), - stream: !hasFlag(parsed, "no-stream"), + stream: !hasFlag(parsed, "no-stream") && provider === "cursor", + provider, }; } +function resolveProvider(parsed: ParsedArgs): "cursor" | "anthropic" { + const provider = flag(parsed, "provider") ?? process.env.QUORUM_PROVIDER ?? "cursor"; + return provider === "anthropic" ? "anthropic" : "cursor"; +} + +function resolveAdapter(parsed: ParsedArgs): TaskRunnerAdapter { + return resolveProvider(parsed) === "anthropic" + ? new AnthropicAdapter() + : new CursorCloudAdapter(); +} + function canvasPathFor(parsed: ParsedArgs, outDir: string): string | false { if (hasFlag(parsed, "no-canvas")) return false; return flag(parsed, "canvas-path") ?? canvasPathFromOutDir(outDir); @@ -281,6 +330,330 @@ function explorationContext( }; } +// ---- synthesize command ---- + +async function synthesizeCommand(parsed: ParsedArgs): Promise { + const ref = parsePullRequestRef(requiredPositional(parsed, "OWNER/REPO#PR or PR_URL")); + const outDir = flag(parsed, "out") ?? join(".quorum", "synthesis", `${repoSlug(ref.repo)}-pr${ref.pr}`); + + const findingsPath = flag(parsed, "findings") ?? join(outDir, "findings.json"); + const clustersPath = flag(parsed, "clusters") ?? join(outDir, "clusters.json"); + const scoredPath = flag(parsed, "out-file") ?? join(outDir, "clusters.scored.json"); + + await mkdir(outDir, { recursive: true }); + + // Step 1: Fetch findings + console.log(`Fetching findings for ${ref.repo}#${ref.pr}...`); + try { + await execFileAsync("bash", [ + join(repoScriptsDir(), "fetch_findings.sh"), + ref.repo, + ref.pr, + findingsPath, + ]); + } catch { + throw new Error("Failed to fetch findings. Ensure gh CLI is authenticated and jq is installed."); + } + + const findings = await readJson(findingsPath); + if (!Array.isArray(findings) || findings.length === 0) { + throw new Error("No bot findings found. Run the Quorum skill for clustering, or check QUORUM_BOTS."); + } + + // If user provided clusters.json, use it; otherwise generate all-singletons + let clustersExists = false; + try { + await readFile(clustersPath); + clustersExists = true; + } catch { /* file doesn't exist */ } + + if (!clustersExists) { + console.log("No clusters.json provided; generating all-singletons clustering."); + await generateSingletonClusters(findings, clustersPath); + } + + // Step 3: Validate & score + console.log("Validating and scoring..."); + try { + await execFileAsync("python3", [ + join(repoScriptsDir(), "validate_partition.py"), + findingsPath, + clustersPath, + "-o", + scoredPath, + ]); + } catch (error) { + const err = error as { stderr?: string; message?: string }; + console.error(err.stderr || err.message || String(error)); + throw new Error("Validation failed. Fix clusters.json or re-run with all-singletons."); + } + + console.log(`wrote ${scoredPath}`); + + // Step 4: Post (or dry-run) + const dryRun = hasFlag(parsed, "dry-run"); + const noPost = hasFlag(parsed, "no-post"); + if (!noPost) { + const args = ["python3", join(repoScriptsDir(), "post_synthesis.py"), ref.repo, ref.pr, scoredPath]; + if (dryRun) args.push("--dry-run"); + if (hasFlag(parsed, "minimize")) args.push("--minimize"); + if (hasFlag(parsed, "no-reactions")) args.push("--no-reactions"); + + try { + const { stdout, stderr } = await execFileAsync("python3", args.slice(1)); + if (dryRun) { + console.log(stdout); + if (stderr) console.error(stderr); + } else { + console.log(stdout.trim()); + } + } catch (error) { + const err = error as { stderr?: string; message?: string }; + console.error(err.stderr || err.message || String(error)); + throw new Error("post_synthesis.py failed."); + } + } +} + +async function generateSingletonClusters(findings: unknown[], outPath: string): Promise { + const clusters = (findings as Array>).map((finding: Record) => { + const lines = (finding.lines as [number | null, number | null]) ?? [null, null]; + return { + cluster_id: `${finding.id}-solo`, + member_ids: [finding.id], + canonical_title: firstLine(String(finding.body ?? "No description")), + canonical_description: firstLine(String(finding.body ?? "No description"), 240), + category: "other", + severity: "minor", + primary_location: { + file: finding.file, + start_line: lines[0], + end_line: lines[1], + }, + match_type: "singleton", + match_confidence: 1.0, + cross_file: false, + }; + }); + await writeJson(outPath, { clusters }); +} + +function firstLine(text: string, limit = 80): string { + for (const line of (text || "").trim().split("\n")) { + const cleaned = line.trim().replace(/^[#*>\-\s]+/, "").trim(); + if (cleaned) return cleaned.slice(0, limit); + } + return "(no comment text)"; +} + +// ---- triage-pr command ---- + +async function triagePrCommand(parsed: ParsedArgs): Promise { + const ref = parsePullRequestRef(requiredPositional(parsed, "PR_URL")); + const tmpDir = join(".quorum", "triage", `${repoSlug(ref.repo)}-pr${ref.pr}`); + const scoredPath = join(tmpDir, "clusters.scored.json"); + + // Step 1: Synthesize + console.log("=== Phase 1: Synthesis ==="); + const synthOverrides: Record = { + out: flag(parsed, "synth-out") ?? tmpDir, + "out-file": scoredPath, + }; + // When plan-only, suppress synthesis posting too since triage-pr is read-only + if (hasFlag(parsed, "plan-only")) { + synthOverrides["no-post"] = "true"; + } + const synthParsed = cloneParsedWith(parsed, synthOverrides); + await synthesizeCommand(synthParsed); + + // Step 2: Explore + console.log("\n=== Phase 2: Exploration ==="); + const runId = createRunId(ref.repo, ref.pr); + const outDir = flag(parsed, "out") ?? join(".quorum", "runs", runId); + const scoredDoc = parseScoredClusters(await readJson(scoredPath)); + await executeExplore({ + repo: ref.repo, + pr: ref.pr, + scoredDoc, + parsed, + planOnly: hasFlag(parsed, "plan-only"), + post: !hasFlag(parsed, "no-post") && !hasFlag(parsed, "dry-run"), + }); +} + +function cloneParsedWith(parsed: ParsedArgs, overrides: Record): ParsedArgs { + const cloned = new Map(parsed.flags); + for (const [key, value] of Object.entries(overrides)) { + cloned.set(key, [value]); + } + return { command: parsed.command, positionals: [...parsed.positionals], flags: cloned }; +} + +// ---- setup command ---- + +async function setupCommand(): Promise { + const checks: Array<{ name: string; ok: boolean; help: string }> = []; + + // Node + const nodeVersion = process.versions.node; + const nodeOk = parseInt(nodeVersion.split(".")[0], 10) >= 22; + checks.push({ + name: `Node.js >= 22 (found ${nodeVersion})`, + ok: nodeOk, + help: "Install Node.js 22+ from https://nodejs.org", + }); + + // gh CLI + try { + await execFileAsync("gh", ["--version"]); + checks.push({ name: "gh CLI", ok: true, help: "" }); + } catch { + checks.push({ name: "gh CLI", ok: false, help: "Install from https://cli.github.com" }); + } + + // jq + try { + await execFileAsync("jq", ["--version"]); + checks.push({ name: "jq", ok: true, help: "" }); + } catch { + checks.push({ name: "jq", ok: false, help: "Install with brew install jq or apt install jq" }); + } + + // python3 + try { + await execFileAsync("python3", ["--version"]); + checks.push({ name: "python3", ok: true, help: "" }); + } catch { + checks.push({ name: "python3", ok: false, help: "Install from https://python.org" }); + } + + // API keys + const cursorKey = !!process.env.CURSOR_API_KEY; + const anthropicKey = !!process.env.ANTHROPIC_API_KEY; + const githubToken = !!process.env.GITHUB_TOKEN || !!process.env.GH_TOKEN; + checks.push({ + name: `CURSOR_API_KEY (${cursorKey ? "set" : "not set"})`, + ok: cursorKey || anthropicKey, + help: "Set CURSOR_API_KEY for Cursor Cloud or ANTHROPIC_API_KEY for Anthropic", + }); + checks.push({ + name: `GITHUB_TOKEN (${githubToken ? "set" : "not set"}, optional)`, + ok: true, + help: "Set GITHUB_TOKEN for direct GitHub API access (falls back to gh CLI)", + }); + + // Skill installation + const home = process.env.HOME || process.env.USERPROFILE || "~"; + const cursorSkillDir = ".cursor/skills/quorum"; + const claudeSkillDir = ".claude/skills/quorum"; + const hasCursorSkill = await dirExists(join(home, cursorSkillDir)); + const hasClaudeSkill = await dirExists(join(home, claudeSkillDir)); + checks.push({ + name: `Cursor skill (${hasCursorSkill ? "installed" : "not installed"})`, + ok: hasCursorSkill || hasClaudeSkill, + help: `Run: mkdir -p ${cursorSkillDir} && unzip quorum.skill -d ${cursorSkillDir}`, + }); + + console.log("Quorum Setup Check\n"); + let allOk = true; + for (const check of checks) { + const icon = check.ok ? "PASS" : "FAIL"; + console.log(` [${icon}] ${check.name}`); + if (!check.ok) { + allOk = false; + console.log(` -> ${check.help}`); + } + } + + if (allOk) { + console.log("\nAll checks passed. Quorum is ready to use."); + } else { + console.log("\nFix the FAIL items above, then re-run 'quorum setup'."); + process.exit(1); + } +} + +async function dirExists(path: string): Promise { + try { + const stat = await import("node:fs/promises").then((fs) => fs.stat(path)); + return stat.isDirectory(); + } catch { + return false; + } +} + +// ---- eval command ---- + +async function evalCommand(parsed: ParsedArgs): Promise { + // Leave logDir undefined when --log-dir is absent so loadRunLogs recursively + // scans .quorum/runs/ (where per-run logs live) instead of a flat directory. + const logDir = flag(parsed, "log-dir"); + const entries = await loadRunLogs(logDir); + + if (entries.length === 0) { + const scanned = logDir ?? join(".quorum", "runs"); + console.log("No run logs found in", scanned); + console.log("Run some explorations first to populate the log."); + return; + } + + const runs = new Map(); + for (const entry of entries) { + const key = entry.runTitle ?? "unknown"; + if (!runs.has(key)) runs.set(key, []); + runs.get(key)!.push(entry); + } + + console.log(`Eval Report — ${runs.size} run(s), ${entries.length} event(s)\n`); + + // Task success rate — a finished task with parseError counts as degraded + const tasks = entries.filter((e) => e.type === "task_end" || e.type === "task_error" || e.type === "task_skip"); + const finishedClean = tasks.filter( + (e) => e.type === "task_end" && e.status === "FINISHED" && !e.parseError, + ).length; + const finishedDegraded = tasks.filter( + (e) => e.type === "task_end" && e.status === "FINISHED" && e.parseError, + ).length; + const finished = finishedClean + finishedDegraded; + const errors = tasks.filter((e) => e.type === "task_error" || e.status === "ERROR").length; + const skipped = tasks.filter((e) => e.type === "task_skip").length; + + console.log("## Task Outcomes"); + console.log(` Total tasks: ${tasks.length}`); + console.log(` Finished clean: ${finishedClean}`); + if (finishedDegraded > 0) { + console.log(` Finished (degraded parse): ${finishedDegraded}`); + } + console.log(` Finished total: ${finished} (${tasks.length ? ((finished / tasks.length) * 100).toFixed(0) : 0}%)`); + console.log(` Errors: ${errors}`); + console.log(` Skipped: ${skipped}`); + console.log(""); + + // Cluster-level stats + const rootCauseTasks = entries.filter((e) => e.taskType === "root_cause"); + const sweepTasks = entries.filter((e) => e.taskType === "pattern_sweep"); + const clusterIds = new Set([...rootCauseTasks, ...sweepTasks].map((e) => e.clusterId).filter(Boolean)); + console.log(`## Clusters Explored: ${clusterIds.size}`); + + // Average duration + const durations = entries + .filter((e) => e.durationMs !== undefined && e.durationMs > 0) + .map((e) => e.durationMs!); + if (durations.length > 0) { + const avg = durations.reduce((a, b) => a + b, 0) / durations.length; + console.log(` Average task duration: ${(avg / 1000).toFixed(1)}s`); + } + + console.log(""); + console.log("## Runs"); + for (const [title, runEntries] of runs) { + const dagStart = runEntries.find((e) => e.type === "dag_start"); + const dagEnd = runEntries.find((e) => e.type === "dag_end"); + const status = dagEnd?.status ?? "unknown"; + console.log(` ${title}: ${status} (${runEntries.length} events)`); + } +} + function parseArgs(argv: string[]): ParsedArgs { let command: string | undefined = argv[0]; let rest = argv.slice(1); @@ -356,33 +729,61 @@ function createRunId(repo: string, pr: string): string { return `${repoSlug(repo)}-pr${pr}-${ts}`; } +function repoScriptsDir(): string { + // When installed as an npm package or running from source, scripts live in + // /scripts/. Resolution (incl. Windows + spaces) lives in paths.ts. + return scriptsDirFromModuleUrl(import.meta.url); +} + function printHelp(): void { console.log(`Usage: + quorum synthesize OWNER/REPO#PR [options] + quorum triage-pr https://github.com/OWNER/REPO/pull/N [options] quorum plan-pr https://github.com/OWNER/REPO/pull/N [options] quorum run-pr https://github.com/OWNER/REPO/pull/N [options] quorum post-pr https://github.com/OWNER/REPO/pull/N [options] quorum canvas .quorum/runs/run-id + quorum setup + quorum eval [--log-dir .quorum/log] quorum explore --repo OWNER/REPO --pr N --scored clusters.scored.json [options] quorum run-dag --dag dag.json --out .quorum/runs/run-id --repo OWNER/REPO [options] - quorum render-canvas --state .quorum/runs/run-id/state.json [--canvas-path PATH] + +Commands: + synthesize Run Phase 1 synthesis pipeline (fetch, score, post) without AI clustering. + triage-pr Run synthesis + exploration end-to-end on a PR. + plan-pr Generate DAG/Canvas without calling cloud agents. + run-pr Run Cursor Cloud or Anthropic exploration (no PR comment by default). + post-pr Run exploration and upsert the PR exploration comment. + canvas Regenerate or open a Canvas from a saved run directory. + setup Validate prerequisites (Node, gh, jq, python3, API keys, skills). + eval Compute reviewer precision and success stats from run logs. Options: - --cluster ID[,ID] Explore explicit cluster IDs instead of quorum filter. - --min-quorum N Default: 2. - --max-clusters N Limit selected clusters. - --scored PATH Use a local clusters.scored.json instead of recovering it from the PR. - --out DIR Output directory. Default: .quorum/runs/-pr-. - --post For run-pr only: upsert the PR exploration comment after the run. - --repo-url URL Default: https://github.com/OWNER/REPO. - --pr-url URL Default: https://github.com/OWNER/REPO/pull/N. - --concurrency N Default: 4. - --task-timeout-ms N Default: 1200000. - --dry-run | --no-post Run cloud exploration but do not write a PR comment. - --plan-only Generate DAG/state/report shell without Cursor Cloud or GitHub calls. - --no-stream Do not consume run.stream(); wait for final result only. - --canvas-path PATH Write a Cursor Canvas artifact to this path. - --no-canvas Do not write the .canvas.tsx artifact. - --no-canvas-mirror Do not mirror the canvas into ~/.cursor/projects//canvases/. + --provider cursor|anthropic Backend for exploration agents. Default: cursor. + --cluster ID[,ID] Explore explicit cluster IDs instead of quorum filter. + --min-quorum N Default: 2. + --max-clusters N Limit selected clusters. + --scored PATH Use a local clusters.scored.json. + --out DIR Output directory. + --post For run-pr only: upsert the PR exploration comment. + --concurrency N Simultaneous tasks. Default: 4. + --task-timeout-ms N Default: 1200000. + --dry-run | --no-post Do not write a PR comment. + --plan-only Generate DAG/state/report without cloud or GitHub calls. + --no-stream Disable streaming (only supported for Cursor provider). + --canvas-path PATH Custom Canvas artifact path. + --no-canvas Skip Canvas generation. + --no-canvas-mirror Skip mirroring into Cursor canvases dir. + --api-key KEY Override the default API key for the selected provider. + +Environment: + CURSOR_API_KEY Cursor Cloud API key. + ANTHROPIC_API_KEY Anthropic API key (required for --provider anthropic). + GITHUB_TOKEN GitHub API token (falls back to gh CLI). + QUORUM_PROVIDER Default: cursor. Set to anthropic for direct Anthropic API. + QUORUM_MODEL_HIGH Model for HIGH complexity tasks. + QUORUM_MODEL_MED Model for MED complexity tasks. + QUORUM_MODEL_LOW Model for LOW complexity tasks. `); } diff --git a/src/cursor-cloud-adapter.ts b/src/cursor-cloud-adapter.ts index 3f92817..28f973d 100644 --- a/src/cursor-cloud-adapter.ts +++ b/src/cursor-cloud-adapter.ts @@ -1,9 +1,20 @@ import { Agent } from "@cursor/sdk"; import type { Run, SDKAgent, SDKMessage } from "@cursor/sdk"; +import { withRetry } from "./retry.js"; import type { TaskExecutionInput, TaskExecutionResult, TaskRunnerAdapter } from "./types.js"; export class CursorCloudAdapter implements TaskRunnerAdapter { + constructor(private options: { maxRetries?: number } = {}) {} + async runTask(input: TaskExecutionInput): Promise { + return withRetry(() => this.runOnce(input), { + maxRetries: this.options.maxRetries, + signal: input.signal, + label: `Task ${input.task.id}`, + }); + } + + private async runOnce(input: TaskExecutionInput): Promise { const started = Date.now(); let agent: SDKAgent | undefined; let run: Run | undefined; @@ -76,7 +87,7 @@ export class CursorCloudAdapter implements TaskRunnerAdapter { if (input.signal.aborted && run) { await cancelBestEffort(run); } - throw error; + throw describeCursorError(error); } finally { input.signal.removeEventListener("abort", abortHandler); if (agent) { @@ -91,6 +102,23 @@ async function cancelBestEffort(run: Run): Promise { await run.cancel().catch(() => undefined); } +/** + * Turn Cursor's opaque simultaneous-agent plan-limit error into an actionable + * message. This fires when more Cloud Agents are launched at once than the + * plan allows; the fix is lower concurrency, not a different API key. + */ +export function describeCursorError(error: unknown): Error { + const message = error instanceof Error ? error.message : String(error); + if (/simultaneous|upgrade to ultra|more cloud agents/i.test(message)) { + return new Error( + "Cursor rejected a concurrent Cloud Agent launch — your plan limits how many " + + "Cloud Agents run at once. Lower --concurrency or upgrade your Cursor plan. " + + `Original error: ${message}`, + ); + } + return error instanceof Error ? error : new Error(message); +} + function textFromSdkMessage(event: SDKMessage): string { if (event.type !== "assistant") return ""; return event.message.content diff --git a/src/dag.ts b/src/dag.ts index 6728658..d79883e 100644 --- a/src/dag.ts +++ b/src/dag.ts @@ -1,13 +1,59 @@ import type { Complexity, Dag, DagTask } from "./types.js"; -export const DEFAULT_MODEL_MAP: Record = { +export type Provider = "cursor" | "anthropic"; + +export const CURSOR_DEFAULT_MODELS: Record = { HIGH: "gpt-5.3-codex", - MED: "composer-2", - LOW: "auto-low", + MED: "composer-2.5", + LOW: "composer-2.5", +}; + +export const ANTHROPIC_DEFAULT_MODELS: Record = { + HIGH: "claude-opus-4-8", + MED: "claude-sonnet-4-6", + LOW: "claude-haiku-4-5", +}; + +export function defaultModelsFor(provider: Provider): Record { + return provider === "anthropic" ? ANTHROPIC_DEFAULT_MODELS : CURSOR_DEFAULT_MODELS; +} + +const MODEL_ENV_VARS: Record = { + HIGH: "QUORUM_MODEL_HIGH", + MED: "QUORUM_MODEL_MED", + LOW: "QUORUM_MODEL_LOW", }; const COMPLEXITIES = new Set(["HIGH", "MED", "LOW"]); +/** + * Build the effective model map for a given provider. + * Precedence: env vars > DAG overrides > provider defaults. + * Env vars always win so users can override models in saved dag.json files. + */ +export function resolveModelMap( + overrides: Partial> | undefined, + provider: Provider = "cursor", +): Record { + const models = { ...defaultModelsFor(provider) }; + // Apply DAG overrides first (they layer on top of provider defaults) + if (overrides) { + for (const level of COMPLEXITIES) { + if (overrides[level]?.trim()) { + models[level] = overrides[level].trim(); + } + } + } + // Env vars are applied last so they always win + for (const level of COMPLEXITIES) { + const envValue = process.env[MODEL_ENV_VARS[level]]; + if (envValue?.trim()) { + models[level] = envValue.trim(); + } + } + return models; +} + export function parseDag(raw: unknown): Dag { if (!raw || typeof raw !== "object" || Array.isArray(raw)) { throw new Error("DAG must be a JSON object."); @@ -107,8 +153,9 @@ export function computeRanks(dag: Dag): DagTask[][] { export function createModelResolver( overrides: Partial> | undefined, + provider: Provider = "cursor", ): (complexity: Complexity) => string { - const models = { ...DEFAULT_MODEL_MAP, ...(overrides ?? {}) }; + const models = resolveModelMap(overrides, provider); return (complexity) => models[complexity]; } diff --git a/src/github.ts b/src/github.ts index f612263..9d34307 100644 --- a/src/github.ts +++ b/src/github.ts @@ -1,7 +1,84 @@ import { spawn } from "node:child_process"; + const MARKER = ""; const MAX_BODY = 60_000; +// ---- GitHub REST helpers ---- + +function token(): string | undefined { + return process.env.GITHUB_TOKEN || process.env.GH_TOKEN || undefined; +} + +function authHeader(): Record { + const tok = token(); + if (!tok) return {}; + return { Authorization: `Bearer ${tok}` }; +} + +interface GitHubComment { + id: number; + body?: string; +} + +async function apiRequest( + method: string, + path: string, + body?: unknown, +): Promise { + const tok = token(); + const url = path.startsWith("https://") ? path : `https://api.github.com/${path}`; + const headers: Record = { + Accept: "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + ...authHeader(), + }; + if (body !== undefined) { + headers["Content-Type"] = "application/json"; + } + + const response = await fetch(url, { + method, + headers, + body: body !== undefined ? JSON.stringify(body) : undefined, + }); + + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error(`GitHub API ${method} ${path} failed (${response.status}): ${text}`); + } + return response.json() as Promise; +} + +async function apiPaginate( + path: string, +): Promise { + const results: T[] = []; + let page = 1; + while (true) { + const paginatedPath = `${path}${path.includes("?") ? "&" : "?"}per_page=100&page=${page}`; + const tok = token(); + const url = `https://api.github.com/${paginatedPath}`; + const headers: Record = { + Accept: "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + ...authHeader(), + }; + + const response = await fetch(url, { headers }); + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error(`GitHub API GET ${paginatedPath} failed (${response.status}): ${text}`); + } + const pageData = (await response.json()) as T[]; + results.push(...pageData); + if (pageData.length < 100) break; + page++; + } + return results; +} + +// ---- Exploration comment upsert ---- + export interface UpsertResult { action: "created" | "updated" | "dry-run"; commentId?: number; @@ -17,18 +94,83 @@ export async function upsertExplorationComment( const body = withMarker(repo, pr, markdown); if (options.dryRun) return { action: "dry-run", body }; - const existing = await findExistingComment(repo, pr); + if (token()) { + return upsertViaApi(repo, pr, body); + } + return upsertViaGh(repo, pr, body); +} + +async function upsertViaApi( + repo: string, + pr: string, + body: string, +): Promise { + const existing = await findExistingCommentApi(repo, pr); + if (existing) { + await apiRequest( + "PATCH", + `repos/${repo}/issues/comments/${existing}`, + { body }, + ); + return { action: "updated", commentId: existing, body }; + } + + const created = await apiRequest( + "POST", + `repos/${repo}/issues/${pr}/comments`, + { body }, + ); + return { action: "created", commentId: created.id, body }; +} + +async function findExistingCommentApi( + repo: string, + pr: string, +): Promise { + const comments = await apiPaginate( + `repos/${repo}/issues/${pr}/comments`, + ); + for (const comment of comments) { + if ((comment.body ?? "").includes("quorum:exploration")) { + return comment.id; + } + } + return undefined; +} + +async function upsertViaGh( + repo: string, + pr: string, + body: string, +): Promise { + const existing = await findExistingCommentGh(repo, pr); const payload = JSON.stringify({ body }); if (existing) { - await gh(["api", "-X", "PATCH", `repos/${repo}/issues/comments/${existing}`, "--input", "-"], payload); + await runGh(["api", "-X", "PATCH", `repos/${repo}/issues/comments/${existing}`, "--input", "-"], payload); return { action: "updated", commentId: existing, body }; } - const raw = await gh(["api", "-X", "POST", `repos/${repo}/issues/${pr}/comments`, "--input", "-"], payload); + const raw = await runGh(["api", "-X", "POST", `repos/${repo}/issues/${pr}/comments`, "--input", "-"], payload); const parsed = JSON.parse(raw) as { id?: number }; return { action: "created", commentId: parsed.id, body }; } +async function findExistingCommentGh(repo: string, pr: string): Promise { + const raw = await runGh([ + "api", + `repos/${repo}/issues/${pr}/comments`, + "--paginate", + "--jq", + '.[] | {id: .id, hit: (.body | contains("quorum:exploration"))} | @json', + ]); + for (const line of raw.split("\n")) { + if (!line.trim()) continue; + const obj = JSON.parse(line) as { id?: number; hit?: boolean }; + if (obj.hit) return obj.id; + } + return undefined; +} + export function withMarker(repo: string, pr: string, markdown: string): string { let body = [MARKER, markdown, `Quorum exploration comment for ${repo}#${pr}`].join( "\n\n", @@ -43,21 +185,7 @@ export function withMarker(repo: string, pr: string, markdown: string): string { return body; } -async function findExistingComment(repo: string, pr: string): Promise { - const raw = await gh([ - "api", - `repos/${repo}/issues/${pr}/comments`, - "--paginate", - "--jq", - '.[] | {id: .id, hit: (.body | contains("quorum:exploration"))} | @json', - ]); - for (const line of raw.split("\n")) { - if (!line.trim()) continue; - const obj = JSON.parse(line) as { id?: number; hit?: boolean }; - if (obj.hit) return obj.id; - } - return undefined; -} +// ---- gh CLI fallback (kept for synthesis.ts compatibility) ---- export async function runGh(args: string[], input?: string): Promise { return await new Promise((resolve, reject) => { @@ -86,5 +214,3 @@ export async function runGh(args: string[], input?: string): Promise { child.stdin.end(); }); } - -const gh = runGh; diff --git a/src/logging.ts b/src/logging.ts new file mode 100644 index 0000000..6f45bda --- /dev/null +++ b/src/logging.ts @@ -0,0 +1,146 @@ +import { appendFile, mkdir, readdir, readFile } from "node:fs/promises"; +import { join } from "node:path"; +import type { Dirent } from "node:fs"; +import type { RunState, TaskState } from "./types.js"; + +export interface LogEntry { + ts?: string; + type: "dag_start" | "dag_end" | "task_start" | "task_end" | "task_error" | "task_skip"; + runTitle?: string; + taskId?: string; + taskType?: string; + clusterId?: string; + status?: string; + errorMessage?: string; + parseError?: string; + durationMs?: number; + agentId?: string; + runId?: string; +} + +/** Write a timestamped log entry to run.log.jsonl in the output directory. */ +export async function logEvent(outDir: string, entry: LogEntry): Promise { + await mkdir(outDir, { recursive: true }); + const enriched = { ts: new Date().toISOString(), ...entry }; + await appendFile(join(outDir, "run.log.jsonl"), JSON.stringify(enriched) + "\n", "utf8"); +} + +/** Log dag_start for an entire run. */ +export async function logRunStart(outDir: string, state: RunState): Promise { + await logEvent(outDir, { + type: "dag_start", + runTitle: state.title, + }); +} + +/** Log dag_end after the run completes. */ +export async function logRunEnd( + outDir: string, + state: RunState, +): Promise { + await logEvent(outDir, { + type: "dag_end", + runTitle: state.title, + status: state.runOutcome ?? "UNKNOWN", + errorMessage: state.runMessage, + }); +} + +/** Log an individual task lifecycle event. */ +export async function logTaskEvent(outDir: string, task: TaskState): Promise { + let type: LogEntry["type"] = "task_end"; + if (task.status === "ERROR") type = "task_error"; + else if (task.status === "SKIPPED") type = "task_skip"; + else if (task.status === "RUNNING") type = "task_start"; + + await logEvent(outDir, { + type, + taskId: task.id, + taskType: task.task_type, + clusterId: task.cluster_id, + status: task.status, + errorMessage: task.errorMessage, + parseError: task.parseError, + durationMs: task.durationMs, + agentId: task.agentId, + runId: task.runId, + }); +} + +/** + * Accumulate all run.log.jsonl files from the given directory tree. + * When `logDir` is provided, scans that directory only (flat .jsonl files). + * When `logDir` is omitted, scans .quorum/runs/ recursively for per-run log files. + * Returns an empty array if no log files are found. + */ +export async function loadRunLogs( + logDir?: string, +): Promise { + const entries: LogEntry[] = []; + + if (logDir) { + // Scan only the explicitly requested directory + try { + const files = await readdir(logDir); + for (const file of files) { + if (!file.endsWith(".jsonl")) continue; + try { + const content = await readFile(join(logDir, file), "utf8"); + entries.push(...parseLogLines(content)); + } catch { + // Skip unreadable files + } + } + } catch { + // logDir may not exist + } + } else { + // Default: scan .quorum/runs/ recursively + try { + await collectRunDirLogs(join(".quorum", "runs"), entries); + } catch { + // .quorum/runs may not exist + } + } + + return entries; +} + +async function collectRunDirLogs( + dir: string, + entries: LogEntry[], +): Promise { + let items: Dirent[] = []; + try { + items = await readdir(dir, { withFileTypes: true }); + } catch { + return; + } + + for (const item of items) { + const fullPath = join(dir, item.name); + if (item.isDirectory()) { + await collectRunDirLogs(fullPath, entries); + } else if (item.name === "run.log.jsonl") { + try { + const content = await readFile(fullPath, "utf8"); + entries.push(...parseLogLines(content)); + } catch { + // Skip unreadable files + } + } + } +} + +function parseLogLines(content: string): LogEntry[] { + const entries: LogEntry[] = []; + for (const line of content.split("\n")) { + if (!line.trim()) continue; + try { + entries.push(JSON.parse(line) as LogEntry); + } catch { + // Skip malformed lines + } + } + return entries; +} diff --git a/src/paths.ts b/src/paths.ts new file mode 100644 index 0000000..e6385b3 --- /dev/null +++ b/src/paths.ts @@ -0,0 +1,18 @@ +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +/** + * Resolve the packaged scripts/ directory from a module's import.meta.url. + * cli.js lives at dist/src/cli.js, so the package root is three levels up. + * + * Uses fileURLToPath rather than a raw string replace so Windows drive letters + * (file:///C:/...) and percent-encoded characters (e.g. a space in the install + * path, which arrives as %20) resolve to a real on-disk path. + */ +export function scriptsDirFromModuleUrl(url: string): string { + if (url.startsWith("file://")) { + const filePath = fileURLToPath(url); + return join(dirname(dirname(dirname(filePath))), "scripts"); + } + return join(process.cwd(), "scripts"); +} diff --git a/src/retry.ts b/src/retry.ts new file mode 100644 index 0000000..f355582 --- /dev/null +++ b/src/retry.ts @@ -0,0 +1,84 @@ +export const MAX_RETRIES = 3; +export const RETRY_BACKOFF_MS = [1_000, 4_000, 16_000]; + +/** + * Return true when the error is transient and worth retrying + * (rate limits, throttles, network interruptions, 5xx server errors). + */ +export function retryableError(error: unknown): boolean { + if (error instanceof Error) { + const msg = error.message.toLowerCase(); + // Match "rate" only within "rate limit" so it never fires on words like + // "generate"/"moderate"; covers rate limit / rate_limit / ratelimit. + if (/rate[\s_-]?limit/.test(msg) || msg.includes("throttle") || msg.includes("capacity")) return true; + if (msg.includes("network") || msg.includes("timeout") || msg.includes("econnrefused")) return true; + // Word-bounded so number tokens like "5000" or "4290" aren't read as 500/429. + if (/\b(429|500|502|503|504)\b/.test(msg)) return true; + if (msg.includes("overloaded")) return true; + } + return false; +} + +/** + * Sleep for `ms`, resolving early if `signal` aborts so a cancelled run does + * not wait out the full backoff before noticing. + */ +export function sleep(ms: number, signal?: AbortSignal): Promise { + if (signal?.aborted) return Promise.resolve(); + return new Promise((resolve) => { + let timer: ReturnType; + const onAbort = (): void => { + clearTimeout(timer); + resolve(); + }; + timer = setTimeout(() => { + signal?.removeEventListener("abort", onAbort); + resolve(); + }, ms); + signal?.addEventListener("abort", onAbort, { once: true }); + }); +} + +export interface RetryOptions { + maxRetries?: number; + /** Caller-provided AbortSignal; checked between attempts. */ + signal?: AbortSignal; + /** Optional label for log messages (e.g. task id). */ + label?: string; +} + +/** + * Execute `fn` with retry + exponential backoff on transient errors. + * On each attempt the retryableError predicate is applied to the caught + * error; non-retryable errors (and the last retryable failure after + * maxRetries) are re-thrown. + */ +export async function withRetry( + fn: () => Promise, + options: RetryOptions = {}, +): Promise { + const maxRetries = options.maxRetries ?? MAX_RETRIES; + let lastError: unknown; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error; + if (options.signal?.aborted) break; + if (attempt < maxRetries && retryableError(error)) { + const delay = RETRY_BACKOFF_MS[Math.min(attempt, RETRY_BACKOFF_MS.length - 1)]; + const tag = options.label ? ` ${options.label}` : ""; + console.error( + `Attempt ${attempt + 1} failed${tag}, retrying in ${delay}ms: ` + + `${error instanceof Error ? error.message : String(error)}`, + ); + await sleep(delay, options.signal); + if (options.signal?.aborted) break; + continue; + } + break; + } + } + throw lastError; +} diff --git a/src/runner.ts b/src/runner.ts index 93571f4..7d05432 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -2,7 +2,9 @@ import { mkdir, writeFile } from "node:fs/promises"; import { join } from "node:path"; import { canvasPathFromOutDir, writeCanvas } from "./canvas.js"; import { computeRanks, createModelResolver } from "./dag.js"; +import type { Provider } from "./dag.js"; import { extractMarkedJson } from "./json-result.js"; +import { logTaskEvent, logRunStart, logRunEnd } from "./logging.js"; import type { Dag, DagTask, @@ -15,8 +17,8 @@ import type { const UPSTREAM_SNIPPET_CAP = 2_000; -export function initialRunState(dag: Dag): RunState { - const modelFor = createModelResolver(dag.models); +export function initialRunState(dag: Dag, provider: Provider = "cursor"): RunState { + const modelFor = createModelResolver(dag.models, provider); return { title: dag.title, startedAt: Date.now(), @@ -35,8 +37,9 @@ export async function runDag( ): Promise { await mkdir(context.outDir, { recursive: true }); const ranks = computeRanks(dag); - const state = initialRunState(dag); + const state = initialRunState(dag, context.provider ?? "cursor"); const stateById = new Map(state.tasks.map((task) => [task.id, task])); + await logRunStart(context.outDir, state); await writeState(context.outDir, state, context.canvasPath, context.canvasMirrorPath); for (const rank of ranks) { @@ -51,6 +54,7 @@ export async function runDag( taskState.finishedAt = Date.now(); taskState.durationMs = 0; taskState.errorMessage = `Skipped because upstream task(s) failed: ${failedDeps.join(", ")}`; + await logTaskEvent(context.outDir, taskState); await writeState(context.outDir, state, context.canvasPath, context.canvasMirrorPath); return; } @@ -67,6 +71,7 @@ export async function runDag( failed.length > 0 ? `Some tasks failed or were skipped: ${failed.map((task) => task.id).join(", ")}` : "All tasks finished."; + await logRunEnd(context.outDir, state); await writeState(context.outDir, state, context.canvasPath, context.canvasMirrorPath); return state; } @@ -97,6 +102,7 @@ async function runOneTask( taskState.status = "RUNNING"; taskState.startedAt = startedAt; await writeState(context.outDir, state, context.canvasPath, context.canvasMirrorPath); + await logTaskEvent(context.outDir, taskState); const controller = new AbortController(); const prompt = stitchPrompt(task, stateById); @@ -125,6 +131,7 @@ async function runOneTask( } finally { taskState.finishedAt = Date.now(); taskState.durationMs = taskState.finishedAt - startedAt; + await logTaskEvent(context.outDir, taskState); } } diff --git a/src/synthesis.ts b/src/synthesis.ts index 9889910..9d7abf2 100644 --- a/src/synthesis.ts +++ b/src/synthesis.ts @@ -2,6 +2,21 @@ import { parseScoredClusters } from "./clusters.js"; import { runGh } from "./github.js"; import type { ScoredClustersDoc } from "./types.js"; +interface GitHubSynthesisComment { + id: number; + body?: string; +} + +function token(): string | undefined { + return process.env.GITHUB_TOKEN || process.env.GH_TOKEN || undefined; +} + +function authHeader(): Record { + const tok = token(); + if (!tok) return {}; + return { Authorization: `Bearer ${tok}` }; +} + export function extractScoredClustersFromCommentBody(body: string): ScoredClustersDoc { const fences = body.matchAll(/```json\s*([\s\S]*?)```/gi); for (const fence of fences) { @@ -17,6 +32,48 @@ export function extractScoredClustersFromCommentBody(body: string): ScoredCluste export async function recoverScoredClustersFromPullRequest( repo: string, pr: string, +): Promise { + if (token()) { + return recoverViaApi(repo, pr); + } + return recoverViaGh(repo, pr); +} + +async function recoverViaApi( + repo: string, + pr: string, +): Promise { + const bodies: string[] = []; + let page = 1; + while (true) { + const url = `https://api.github.com/repos/${repo}/issues/${pr}/comments?per_page=100&page=${page}`; + const headers: Record = { + Accept: "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + ...authHeader(), + }; + + const response = await fetch(url, { headers }); + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error(`GitHub API GET repos/${repo}/issues/${pr}/comments failed (${response.status}): ${text}`); + } + const comments = (await response.json()) as GitHubSynthesisComment[]; + for (const comment of comments) { + if ((comment.body ?? "").includes("quorum:synthesis")) { + bodies.push(comment.body!); + } + } + if (comments.length < 100) break; + page++; + } + + return processBodies(bodies, repo, pr); +} + +async function recoverViaGh( + repo: string, + pr: string, ): Promise { const raw = await runGh([ "api", @@ -32,6 +89,10 @@ export async function recoverScoredClustersFromPullRequest( .filter(Boolean) .map((line) => JSON.parse(line) as string); + return processBodies(bodies, repo, pr); +} + +function processBodies(bodies: string[], repo: string, pr: string): ScoredClustersDoc { for (const body of bodies.reverse()) { try { return extractScoredClustersFromCommentBody(body); diff --git a/src/types.ts b/src/types.ts index f4ab7a0..eeb693a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -97,6 +97,8 @@ export interface RunnerContext { concurrency: number; taskTimeoutMs: number; stream: boolean; + /** Provider identifier for resolving default models. */ + provider?: "cursor" | "anthropic"; } export interface TaskExecutionInput { diff --git a/test/cursor-cloud-adapter.test.ts b/test/cursor-cloud-adapter.test.ts new file mode 100644 index 0000000..a6ae341 --- /dev/null +++ b/test/cursor-cloud-adapter.test.ts @@ -0,0 +1,34 @@ +import assert from "node:assert/strict"; +import test from "node:test"; +import { describeCursorError } from "../src/cursor-cloud-adapter.js"; + +const PLAN_LIMIT_MESSAGE = + "[validation_error] Upgrade to Ultra for more Cloud Agents: You've reached " + + "the limit for your current plan. Upgrade to Ultra to run more Cloud Agents " + + "simultaneously."; + +test("describeCursorError rewrites the simultaneous-agent plan-limit error", () => { + const result = describeCursorError(new Error(PLAN_LIMIT_MESSAGE)); + assert.ok(result instanceof Error); + // Points the user at the real fix instead of the opaque upstream message. + assert.match(result.message, /--concurrency/); + assert.match(result.message, /plan limits how many/i); + // No stale hardcoded default (the runner default has changed before). + assert.doesNotMatch(result.message, /default is \d/); + // Preserves the original text for debugging. + assert.match(result.message, /Original error:/); + assert.ok(result.message.includes("simultaneously")); +}); + +test("describeCursorError leaves unrelated errors unchanged", () => { + const original = new Error("Task aborted."); + const result = describeCursorError(original); + assert.equal(result, original); + assert.doesNotMatch(result.message, /--concurrency/); +}); + +test("describeCursorError wraps non-Error values", () => { + const result = describeCursorError("network blip"); + assert.ok(result instanceof Error); + assert.equal(result.message, "network blip"); +}); diff --git a/test/logging.test.ts b/test/logging.test.ts new file mode 100644 index 0000000..479bd33 --- /dev/null +++ b/test/logging.test.ts @@ -0,0 +1,54 @@ +import assert from "node:assert/strict"; +import { mkdtemp, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import test from "node:test"; +import { loadRunLogs, logEvent } from "../src/logging.js"; + +test("loadRunLogs() scans .quorum/runs/ recursively when no logDir is given", async () => { + // Regression: evalCommand once defaulted logDir to ".quorum/log", which forced + // loadRunLogs down the flat-scan branch and never discovered the per-run logs + // that live under .quorum/runs////run.log.jsonl. The no-arg call + // must recurse and find them. + const work = await mkdtemp(join(tmpdir(), "quorum-logs-")); + const originalCwd = process.cwd(); + try { + process.chdir(work); + await logEvent(join(".quorum", "runs", "owner__repo", "pr-1", "20260101"), { + type: "dag_start", + runTitle: "demo run", + }); + const entries = await loadRunLogs(); + assert.equal(entries.length, 1); + assert.equal(entries[0].runTitle, "demo run"); + } finally { + process.chdir(originalCwd); + } +}); + +test("loadRunLogs() returns empty when .quorum/runs/ is absent", async () => { + const work = await mkdtemp(join(tmpdir(), "quorum-logs-empty-")); + const originalCwd = process.cwd(); + try { + process.chdir(work); + const entries = await loadRunLogs(); + assert.deepEqual(entries, []); + } finally { + process.chdir(originalCwd); + } +}); + +test("loadRunLogs(dir) scans only the given flat directory", async () => { + const dir = await mkdtemp(join(tmpdir(), "quorum-logs-flat-")); + await writeFile( + join(dir, "a.jsonl"), + JSON.stringify({ type: "task_end", taskId: "t1", status: "FINISHED" }) + "\n", + "utf8", + ); + // Non-.jsonl files in the directory are ignored. + await writeFile(join(dir, "notes.txt"), "ignored", "utf8"); + + const entries = await loadRunLogs(dir); + assert.equal(entries.length, 1); + assert.equal(entries[0].taskId, "t1"); +}); diff --git a/test/paths.test.ts b/test/paths.test.ts new file mode 100644 index 0000000..662a5bd --- /dev/null +++ b/test/paths.test.ts @@ -0,0 +1,24 @@ +import assert from "node:assert/strict"; +import { join } from "node:path"; +import test from "node:test"; +import { scriptsDirFromModuleUrl } from "../src/paths.js"; + +test("scriptsDirFromModuleUrl resolves the package root three levels up", () => { + const result = scriptsDirFromModuleUrl("file:///opt/quorum/dist/src/cli.js"); + assert.equal(result, join("/opt/quorum", "scripts")); +}); + +test("scriptsDirFromModuleUrl decodes percent-encoded characters", () => { + // Regression: a raw url.replace("file://", "") left %20 undecoded, so an + // install path with a space (e.g. macOS "Application Support") pointed at a + // nonexistent scripts/ dir. fileURLToPath decodes it. + const result = scriptsDirFromModuleUrl("file:///home/John%20Smith/quorum/dist/src/cli.js"); + assert.ok(result.includes("John Smith"), result); + assert.ok(!result.includes("%20"), result); + assert.ok(result.endsWith(join("quorum", "scripts")), result); +}); + +test("scriptsDirFromModuleUrl falls back to cwd for non-file URLs", () => { + const result = scriptsDirFromModuleUrl("data:text/javascript,export%20const%20x=1"); + assert.equal(result, join(process.cwd(), "scripts")); +}); diff --git a/test/retry.test.ts b/test/retry.test.ts new file mode 100644 index 0000000..1ad40ab --- /dev/null +++ b/test/retry.test.ts @@ -0,0 +1,46 @@ +import assert from "node:assert/strict"; +import test from "node:test"; +import { retryableError, sleep } from "../src/retry.js"; + +test("retryableError matches genuine transient failures", () => { + assert.equal(retryableError(new Error("Anthropic API error (503): service unavailable")), true); + assert.equal(retryableError(new Error("request failed with status 500")), true); + assert.equal(retryableError(new Error("429 Too Many Requests")), true); + assert.equal(retryableError(new Error("socket network timeout")), true); + assert.equal(retryableError(new Error("overloaded_error")), true); +}); + +test("retryableError ignores numbers that merely contain a 5xx substring", () => { + // Regression: msg.includes("500") matched "5000", forcing ~21s of pointless + // retries on a non-retryable 400. + assert.equal( + retryableError(new Error("Anthropic API error (400): max_tokens: 5000 > model maximum")), + false, + ); + assert.equal(retryableError(new Error("not_found_error: invalid model id")), false); +}); + +test("retryableError matches rate-limit phrasings without firing on lookalikes", () => { + assert.equal(retryableError(new Error("rate limit exceeded")), true); + assert.equal(retryableError(new Error("rate_limit_error")), true); // Anthropic error type + // Lookalikes must not trigger retries (same substring class as the 5xx fix): + assert.equal(retryableError(new Error("failed to generate output")), false); + assert.equal(retryableError(new Error("max_tokens: 4290 > model maximum")), false); +}); + +test("sleep resolves immediately when the signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(); + const start = Date.now(); + await sleep(10_000, controller.signal); + assert.ok(Date.now() - start < 1_000, "aborted sleep must not wait out the full delay"); +}); + +test("sleep wakes early when the signal aborts mid-wait", async () => { + const controller = new AbortController(); + const start = Date.now(); + const pending = sleep(10_000, controller.signal); + controller.abort(); + await pending; + assert.ok(Date.now() - start < 1_000, "sleep must resolve once the signal aborts"); +});