diff --git a/src/locus/reasoning/gsar_judge.py b/src/locus/reasoning/gsar_judge.py index 1b8bc148..10833dce 100644 --- a/src/locus/reasoning/gsar_judge.py +++ b/src/locus/reasoning/gsar_judge.py @@ -184,6 +184,34 @@ async def judge( to partition the synthesis into atomic claims and label each claim according to its relationship to the evidence. +FIRST: enumerate every atomic claim in the synthesis. EVERY atomic +statement must appear in exactly one of the four buckets — none can +be omitted. The four lists together must contain all atomic claims +of the synthesis. + +CRITICAL GROUNDING RULE +----------------------- +A claim is `grounded` ONLY when you can point to a specific line or +field of the EVIDENCE that supports it directly. If you cannot quote +that evidence: +- if the claim makes a specific factual assertion (a name, an ID, a + timestamp, a number, a verb of action) that is not in evidence, it + goes to ungrounded_claims; +- if a fact in the claim conflicts with the evidence, it goes to + contradicted_claims; +- if the claim is a high-level non-redundant alternative perspective + that doesn't conflict with anything in grounded_claims, it goes to + complementary_claims. + +Plausibility is NOT grounding. "Engineer Pat Smith paged at 14:05" +is ungrounded if no evidence row mentions Pat Smith. "Query QID-77321 +killed" is ungrounded if no evidence row mentions QID-77321. Putting +such claims in grounded_claims is a labelling error. + +Conversely, claims that DO match evidence rows (paraphrases of tool +output, references to signal fields) MUST be placed in grounded_claims +— do not omit them. + Use the following partitions: - grounded_claims: claims directly supported by the evidence. diff --git a/tests/integration/test_gsar_live.py b/tests/integration/test_gsar_live.py index 10ae9eef..bf995d17 100644 --- a/tests/integration/test_gsar_live.py +++ b/tests/integration/test_gsar_live.py @@ -177,3 +177,352 @@ async def test_gsar_judge_emits_partition_with_evidence_types() -> None: f"expected at least one grounded claim with tool-flavoured type, " f"got grounded={[(c.text, c.type) for c in partition.grounded]}" ) + + +# --------------------------------------------------------------------------- +# End-to-end outer-loop dynamics — the paper's core claims live here. +# --------------------------------------------------------------------------- + + +@skip_without_openai +@pytest.mark.asyncio +async def test_gsar_recovery_then_proceed_live_cycle() -> None: + """Loose synthesis with a contradicted claim → recovery → proceed. + + Paper's §5.2 contract on the recovery tiers: a report whose + grounded core is solid but whose synthesis contains a refuted + claim gets caught and the loop dispatches *some* recovery action + (regenerate or replan, depending on how the judge weighs the + contradiction); after the callback rewrites the synthesis to + drop the contradiction, the loop converges to proceed. + + Real-world judge variance means we don't pin which tier fires + on the first iteration; the load-bearing claim is that the loop + *recovers* and *converges*. Discrete-tier dispatch is covered + in the unit tests against a scripted judge. + """ + from locus.models.native.openai import OpenAIModel + from locus.reasoning.gsar import Decision, GSARThresholds + from locus.reasoning.gsar_evaluator import GSAREvaluator + from locus.reasoning.gsar_judge import JudgeOutput, StructuredOutputGSARJudge + + judge = StructuredOutputGSARJudge( + model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048), + ) + + # Mix grounded + a directly-contradicted claim. The contradicted + # path is more reliable than the ungrounded path because the judge + # has the explicit refuting row to point at — "rps held steady at + # 4500" vs the tool output "rps=12.4". The regenerate sub-agent + # drops the contradiction; the rewritten synthesis is pure tool_match + # grounded → S = 1.0 → proceed. + initial_report = ( + "CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. " + "Request rate held steady at 4500 RPS throughout the spike. " + "An alert fired at 14:02 UTC." + ) + tightened_report = ( + "CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. An alert fired at 14:02 UTC." + ) + evidence = ( + "[tool=query_metrics row=14:02:01] host=db-prod-1 cpu_pct=97.2\n" + "[tool=query_metrics row=14:02:01] host=db-prod-1 rps=12.4\n" + "[signal] alert_id=A-9912 fired_at=14:02:00 metric=cpu_pct severity=high\n" + ) + + regen_calls = 0 + replan_calls = 0 + + async def regen(syn: str, jo: JudgeOutput) -> str: + nonlocal regen_calls + regen_calls += 1 + # In production, the regenerate sub-agent rewrites the synthesis + # using ε; we simulate that here by returning the tightened + # version that drops the unsupported claims. + return tightened_report + + async def replan(syn: str, ev: str, jo: JudgeOutput) -> tuple[str, str]: + nonlocal replan_calls + replan_calls += 1 + # Replan should not be reached on this input. + return tightened_report, ev + + # Default thresholds. The loop has up to 2 attempts to recover + # before timing out — judges can land first iteration in either + # regenerate (cheap) or replan (expensive) on contradicted-claim + # input; both rewrite to a clean synthesis on the next pass. + evaluator = GSAREvaluator( + judge=judge, + regenerate_fn=regen, + replan_fn=replan, + thresholds=GSARThresholds(), + k_max=2, + ) + result = await evaluator.evaluate(report_synthesis=initial_report, evidence_corpus=evidence) + + assert result.final_decision == Decision.PROCEED, ( + f"final={result.final_decision}, score={result.final_score:.3f}, " + f"trajectory={[(t.decision, round(t.score, 3)) for t in result.trajectory]}" + ) + # *Some* recovery branch must have fired — that's the load-bearing + # claim. Whether it was regenerate or replan depends on judge + # weighting; the unit tests cover both discretely. + total_recovery_calls = regen_calls + replan_calls + assert total_recovery_calls >= 1, ( + "no recovery branch fired despite first iteration not proceeding" + ) + assert not result.degraded + # Trajectory monotonicity: the last iteration's score must not be + # lower than the first — recovery should be a non-regression. + scores = [t.score for t in result.trajectory] + assert scores[-1] >= scores[0] - 1e-9, f"score decreased across recovery cycle: {scores}" + + +@skip_without_openai +@pytest.mark.asyncio +async def test_gsar_replan_then_proceed_live_cycle() -> None: + """Insufficient evidence → replan → proceed once evidence is added. + + Paper's §5.2 expensive branch: when the synthesis can't be fixed + by rewriting alone, the orchestrator revises the plan and + re-dispatches specialists. We simulate the dispatch by appending + new tool outputs to the evidence corpus. + """ + from locus.models.native.openai import OpenAIModel + from locus.reasoning.gsar import Decision + from locus.reasoning.gsar_evaluator import GSAREvaluator + from locus.reasoning.gsar_judge import JudgeOutput, StructuredOutputGSARJudge + + judge = StructuredOutputGSARJudge( + model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048), + ) + + # Specific factual claims with literally no evidence corpus on the + # first iteration → judge has nothing to ground against, must + # partition into ungrounded (and may abstain). Either branch + # dispatches to replan_fn under the §6 abstain == replan rule. + report = ( + "CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. " + "Request rate dropped to 12 RPS at the same time." + ) + initial_evidence = "(no evidence collected yet)\n" + fresh_tool_evidence = ( + "[signal] alert_id=A-9912 fired_at=14:02:00 metric=cpu_pct severity=high\n" + "[tool=query_metrics row=14:02:01] host=db-prod-1 cpu_pct=97.2\n" + "[tool=query_metrics row=14:02:01] host=db-prod-1 rps=12.4\n" + ) + + regen_calls = 0 + replan_calls = 0 + + async def regen(syn: str, jo: JudgeOutput) -> str: + nonlocal regen_calls + regen_calls += 1 + # Echo unchanged — we want the loop to be the one that fixes this + # by gathering fresh evidence on a subsequent replan. + return syn + + async def replan(syn: str, ev: str, jo: JudgeOutput) -> tuple[str, str]: + nonlocal replan_calls + replan_calls += 1 + # Production: revise plan, re-dispatch specialists, get fresh evidence. + return syn, fresh_tool_evidence + + evaluator = GSAREvaluator( + judge=judge, + regenerate_fn=regen, + replan_fn=replan, + k_max=2, + ) + result = await evaluator.evaluate(report_synthesis=report, evidence_corpus=initial_evidence) + + # Eventually the loop should reach proceed once the evidence is fresh. + # If the judge gets unusually picky we accept up to 2 replans; the test + # is gated by k_max=2 so the loop terminates in any case. + assert result.final_decision == Decision.PROCEED, ( + f"final={result.final_decision}, score={result.final_score:.3f}, " + f"replans={result.replans_used}, regens={result.regenerations_used}, " + f"trajectory={[(t.decision, round(t.score, 3)) for t in result.trajectory]}" + ) + # At least one recovery action of *some* kind must have been taken — + # the judge variance can land first iteration in either replan or + # regenerate. What matters is that the loop recovered and the + # evaluator dispatched to one of the two side-effect callbacks. + assert (replan_calls + regen_calls) >= 1, ( + "no recovery callback was invoked despite first-iteration not proceeding" + ) + assert not result.degraded + + +@skip_without_openai +@pytest.mark.asyncio +async def test_gsar_budget_exhaustion_sets_degraded_live() -> None: + """Unsalvageable input → K_max replans → degraded=True. + + Paper's §5.3 contract: returning a degraded-but-honest report is + preferable to looping indefinitely or hallucinating grounded. + Drives the live judge with a no-op replan_fn, which leaves the + bad evidence in place; the loop should exhaust ``K_max`` and + return with the flag set. + """ + from locus.models.native.openai import OpenAIModel + from locus.reasoning.gsar_evaluator import GSAREvaluator + from locus.reasoning.gsar_judge import JudgeOutput, StructuredOutputGSARJudge + + judge = StructuredOutputGSARJudge( + model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048), + ) + + # Specific factual claims, no supporting evidence at all. Judge + # cannot ground these no matter how many times we re-call it. + report = ( + "The outage was caused by a failed power supply unit in rack 7B. " + "The PSU was replaced at 03:15 UTC by technician T-218. " + "Customer-facing latency returned to baseline within 8 minutes." + ) + evidence = "(no evidence available)\n" + + async def regen(syn: str, jo: JudgeOutput) -> str: + return syn + + async def replan(syn: str, ev: str, jo: JudgeOutput) -> tuple[str, str]: + # Deliberately do NOT add evidence — simulates a failed plan. + return syn, ev + + evaluator = GSAREvaluator( + judge=judge, + regenerate_fn=regen, + replan_fn=replan, + k_max=2, + ) + result = await evaluator.evaluate(report_synthesis=report, evidence_corpus=evidence) + + # The loop must terminate without hanging. + assert result.degraded is True, ( + f"expected degraded=True after K_max replans without recovery, " + f"got final={result.final_decision}, replans={result.replans_used}" + ) + assert result.replans_used == 2 + # Three judge calls total: iteration 0 + 2 post-replan judges. + assert len(result.trajectory) == 3 + + +@skip_without_openai +@pytest.mark.asyncio +async def test_gsar_rho_zero_inflation_visible_live() -> None: + """Property P5 in practice: dropping ρ inflates ``S`` on a real + judge-produced partition. + + The §8.5 ablation. Run the judge once, then score the same + partition twice — with default ρ=0.5 and with ρ=0. The ρ=0 score + must be ≥ the ρ=0.5 score. The contradiction-non-suppression + property is what prevents adversarial summarisers from boosting + their score by silently dropping refuted claims. + """ + from locus.models.native.openai import OpenAIModel + from locus.reasoning.gsar import gsar_score + from locus.reasoning.gsar_judge import StructuredOutputGSARJudge + + judge = StructuredOutputGSARJudge( + model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048), + ) + + # Mixed report: most claims grounded, but one factually wrong claim + # that the judge should partition into contradicted. + report = ( + "CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. " + "Request rate held steady at 4500 RPS throughout the spike. " + "An alert fired at 14:02 UTC." + ) + evidence = ( + "[tool=query_metrics row=14:02:01] host=db-prod-1 cpu_pct=97.2\n" + "[tool=query_metrics row=14:02:01] host=db-prod-1 rps=12.4\n" + "[signal] alert_id=A-9912 fired_at=14:02:00 metric=cpu_pct severity=high\n" + ) + + out = await judge.judge(report_synthesis=report, evidence_corpus=evidence) + partition = out.to_partition() + + # Pre-condition for the test to be meaningful: the judge identified + # at least one contradicted claim. The "rps held steady at 4500" + # statement directly conflicts with the tool output (12.4 RPS). + if not partition.contradicted: + pytest.skip( + "judge did not produce a contradicted claim on this run; " + "P5 ablation requires W(X) > 0 to be observable. " + f"partition={[(b, len(getattr(partition, b))) for b in ('grounded', 'ungrounded', 'contradicted', 'complementary')]}" + ) + + s_default = gsar_score(partition, contradiction_penalty=0.5) + s_no_rho = gsar_score(partition, contradiction_penalty=0.0) + + assert s_no_rho >= s_default - 1e-9, ( + f"ρ=0 produced lower score than ρ=0.5: s_default={s_default:.4f}, s_no_rho={s_no_rho:.4f}" + ) + assert s_no_rho > s_default, ( + f"ρ=0 should strictly inflate when there's contradicted mass — " + f"s_default={s_default:.4f}, s_no_rho={s_no_rho:.4f}, " + f"|X|={len(partition.contradicted)}" + ) + + +@skip_without_openai +@pytest.mark.asyncio +async def test_gsar_cross_judge_decision_agreement() -> None: + """Two different OpenAI judges should agree on δ for clear inputs. + + Paper §11 / Table 10 claim: the C₃ contradiction-penalty effect is + judge-agnostic. We exercise a weaker but cheaper version — for a + clearly-grounded report and a clearly-ungrounded report, two + different judge models should land in the same decision tier + under the reference thresholds. + """ + from locus.models.native.openai import OpenAIModel + from locus.reasoning.gsar import Decision, decide, gsar_score + from locus.reasoning.gsar_judge import StructuredOutputGSARJudge + + j_mini = StructuredOutputGSARJudge( + model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048), + ) + j_full = StructuredOutputGSARJudge( + model=OpenAIModel(model="gpt-4o", max_tokens=2048), + ) + + grounded = { + "report": ( + "CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. " + "Request rate dropped to 12 RPS at the same time." + ), + "evidence": ( + "[tool=query_metrics row=14:02:01] host=db-prod-1 cpu_pct=97.2\n" + "[tool=query_metrics row=14:02:01] host=db-prod-1 rps=12.4\n" + ), + } + ungrounded = { + "report": ("The outage was caused by a failed power supply at 03:15 UTC."), + "evidence": "[signal] alert_id=A-1042 fired_at=02:48:12 metric=availability\n", + } + + async def decision_for(judge, payload: dict[str, str]) -> Decision: + out = await judge.judge( + report_synthesis=payload["report"], + evidence_corpus=payload["evidence"], + ) + if out.abstained: + return Decision.ABSTAIN + score = gsar_score(out.to_partition()) + return decide(score) + + d_mini_g = await decision_for(j_mini, grounded) + d_full_g = await decision_for(j_full, grounded) + d_mini_u = await decision_for(j_mini, ungrounded) + d_full_u = await decision_for(j_full, ungrounded) + + # Grounded report: both judges should not land in `replan`. We allow + # `regenerate` because gpt-4o-mini occasionally over-flags an + # inference; the cheap-recovery tier is correct in that case. + assert d_mini_g != Decision.REPLAN, f"gpt-4o-mini sent grounded → replan: {d_mini_g}" + assert d_full_g != Decision.REPLAN, f"gpt-4o sent grounded → replan: {d_full_g}" + # Ungrounded report: both judges should not land in `proceed`. + assert d_mini_u != Decision.PROCEED, f"gpt-4o-mini sent ungrounded → proceed: {d_mini_u}" + assert d_full_u != Decision.PROCEED, f"gpt-4o sent ungrounded → proceed: {d_full_u}"