Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/locus/reasoning/gsar_judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,34 @@ async def judge(
to partition the synthesis into atomic claims and label each claim
according to its relationship to the evidence.

FIRST: enumerate every atomic claim in the synthesis. EVERY atomic
statement must appear in exactly one of the four buckets — none can
be omitted. The four lists together must contain all atomic claims
of the synthesis.

CRITICAL GROUNDING RULE
-----------------------
A claim is `grounded` ONLY when you can point to a specific line or
field of the EVIDENCE that supports it directly. If you cannot quote
that evidence:
- if the claim makes a specific factual assertion (a name, an ID, a
timestamp, a number, a verb of action) that is not in evidence, it
goes to ungrounded_claims;
- if a fact in the claim conflicts with the evidence, it goes to
contradicted_claims;
- if the claim is a high-level non-redundant alternative perspective
that doesn't conflict with anything in grounded_claims, it goes to
complementary_claims.

Plausibility is NOT grounding. "Engineer Pat Smith paged at 14:05"
is ungrounded if no evidence row mentions Pat Smith. "Query QID-77321
killed" is ungrounded if no evidence row mentions QID-77321. Putting
such claims in grounded_claims is a labelling error.

Conversely, claims that DO match evidence rows (paraphrases of tool
output, references to signal fields) MUST be placed in grounded_claims
— do not omit them.

Use the following partitions:

- grounded_claims: claims directly supported by the evidence.
Expand Down
349 changes: 349 additions & 0 deletions tests/integration/test_gsar_live.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,352 @@ async def test_gsar_judge_emits_partition_with_evidence_types() -> None:
f"expected at least one grounded claim with tool-flavoured type, "
f"got grounded={[(c.text, c.type) for c in partition.grounded]}"
)


# ---------------------------------------------------------------------------
# End-to-end outer-loop dynamics — the paper's core claims live here.
# ---------------------------------------------------------------------------


@skip_without_openai
@pytest.mark.asyncio
async def test_gsar_recovery_then_proceed_live_cycle() -> None:
"""Loose synthesis with a contradicted claim → recovery → proceed.

Paper's §5.2 contract on the recovery tiers: a report whose
grounded core is solid but whose synthesis contains a refuted
claim gets caught and the loop dispatches *some* recovery action
(regenerate or replan, depending on how the judge weighs the
contradiction); after the callback rewrites the synthesis to
drop the contradiction, the loop converges to proceed.

Real-world judge variance means we don't pin which tier fires
on the first iteration; the load-bearing claim is that the loop
*recovers* and *converges*. Discrete-tier dispatch is covered
in the unit tests against a scripted judge.
"""
from locus.models.native.openai import OpenAIModel
from locus.reasoning.gsar import Decision, GSARThresholds
from locus.reasoning.gsar_evaluator import GSAREvaluator
from locus.reasoning.gsar_judge import JudgeOutput, StructuredOutputGSARJudge

judge = StructuredOutputGSARJudge(
model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048),
)

# Mix grounded + a directly-contradicted claim. The contradicted
# path is more reliable than the ungrounded path because the judge
# has the explicit refuting row to point at — "rps held steady at
# 4500" vs the tool output "rps=12.4". The regenerate sub-agent
# drops the contradiction; the rewritten synthesis is pure tool_match
# grounded → S = 1.0 → proceed.
initial_report = (
"CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. "
"Request rate held steady at 4500 RPS throughout the spike. "
"An alert fired at 14:02 UTC."
)
tightened_report = (
"CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. An alert fired at 14:02 UTC."
)
evidence = (
"[tool=query_metrics row=14:02:01] host=db-prod-1 cpu_pct=97.2\n"
"[tool=query_metrics row=14:02:01] host=db-prod-1 rps=12.4\n"
"[signal] alert_id=A-9912 fired_at=14:02:00 metric=cpu_pct severity=high\n"
)

regen_calls = 0
replan_calls = 0

async def regen(syn: str, jo: JudgeOutput) -> str:
nonlocal regen_calls
regen_calls += 1
# In production, the regenerate sub-agent rewrites the synthesis
# using ε; we simulate that here by returning the tightened
# version that drops the unsupported claims.
return tightened_report

async def replan(syn: str, ev: str, jo: JudgeOutput) -> tuple[str, str]:
nonlocal replan_calls
replan_calls += 1
# Replan should not be reached on this input.
return tightened_report, ev

# Default thresholds. The loop has up to 2 attempts to recover
# before timing out — judges can land first iteration in either
# regenerate (cheap) or replan (expensive) on contradicted-claim
# input; both rewrite to a clean synthesis on the next pass.
evaluator = GSAREvaluator(
judge=judge,
regenerate_fn=regen,
replan_fn=replan,
thresholds=GSARThresholds(),
k_max=2,
)
result = await evaluator.evaluate(report_synthesis=initial_report, evidence_corpus=evidence)

assert result.final_decision == Decision.PROCEED, (
f"final={result.final_decision}, score={result.final_score:.3f}, "
f"trajectory={[(t.decision, round(t.score, 3)) for t in result.trajectory]}"
)
# *Some* recovery branch must have fired — that's the load-bearing
# claim. Whether it was regenerate or replan depends on judge
# weighting; the unit tests cover both discretely.
total_recovery_calls = regen_calls + replan_calls
assert total_recovery_calls >= 1, (
"no recovery branch fired despite first iteration not proceeding"
)
assert not result.degraded
# Trajectory monotonicity: the last iteration's score must not be
# lower than the first — recovery should be a non-regression.
scores = [t.score for t in result.trajectory]
assert scores[-1] >= scores[0] - 1e-9, f"score decreased across recovery cycle: {scores}"


@skip_without_openai
@pytest.mark.asyncio
async def test_gsar_replan_then_proceed_live_cycle() -> None:
"""Insufficient evidence → replan → proceed once evidence is added.

Paper's §5.2 expensive branch: when the synthesis can't be fixed
by rewriting alone, the orchestrator revises the plan and
re-dispatches specialists. We simulate the dispatch by appending
new tool outputs to the evidence corpus.
"""
from locus.models.native.openai import OpenAIModel
from locus.reasoning.gsar import Decision
from locus.reasoning.gsar_evaluator import GSAREvaluator
from locus.reasoning.gsar_judge import JudgeOutput, StructuredOutputGSARJudge

judge = StructuredOutputGSARJudge(
model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048),
)

# Specific factual claims with literally no evidence corpus on the
# first iteration → judge has nothing to ground against, must
# partition into ungrounded (and may abstain). Either branch
# dispatches to replan_fn under the §6 abstain == replan rule.
report = (
"CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. "
"Request rate dropped to 12 RPS at the same time."
)
initial_evidence = "(no evidence collected yet)\n"
fresh_tool_evidence = (
"[signal] alert_id=A-9912 fired_at=14:02:00 metric=cpu_pct severity=high\n"
"[tool=query_metrics row=14:02:01] host=db-prod-1 cpu_pct=97.2\n"
"[tool=query_metrics row=14:02:01] host=db-prod-1 rps=12.4\n"
)

regen_calls = 0
replan_calls = 0

async def regen(syn: str, jo: JudgeOutput) -> str:
nonlocal regen_calls
regen_calls += 1
# Echo unchanged — we want the loop to be the one that fixes this
# by gathering fresh evidence on a subsequent replan.
return syn

async def replan(syn: str, ev: str, jo: JudgeOutput) -> tuple[str, str]:
nonlocal replan_calls
replan_calls += 1
# Production: revise plan, re-dispatch specialists, get fresh evidence.
return syn, fresh_tool_evidence

evaluator = GSAREvaluator(
judge=judge,
regenerate_fn=regen,
replan_fn=replan,
k_max=2,
)
result = await evaluator.evaluate(report_synthesis=report, evidence_corpus=initial_evidence)

# Eventually the loop should reach proceed once the evidence is fresh.
# If the judge gets unusually picky we accept up to 2 replans; the test
# is gated by k_max=2 so the loop terminates in any case.
assert result.final_decision == Decision.PROCEED, (
f"final={result.final_decision}, score={result.final_score:.3f}, "
f"replans={result.replans_used}, regens={result.regenerations_used}, "
f"trajectory={[(t.decision, round(t.score, 3)) for t in result.trajectory]}"
)
# At least one recovery action of *some* kind must have been taken —
# the judge variance can land first iteration in either replan or
# regenerate. What matters is that the loop recovered and the
# evaluator dispatched to one of the two side-effect callbacks.
assert (replan_calls + regen_calls) >= 1, (
"no recovery callback was invoked despite first-iteration not proceeding"
)
assert not result.degraded


@skip_without_openai
@pytest.mark.asyncio
async def test_gsar_budget_exhaustion_sets_degraded_live() -> None:
"""Unsalvageable input → K_max replans → degraded=True.

Paper's §5.3 contract: returning a degraded-but-honest report is
preferable to looping indefinitely or hallucinating grounded.
Drives the live judge with a no-op replan_fn, which leaves the
bad evidence in place; the loop should exhaust ``K_max`` and
return with the flag set.
"""
from locus.models.native.openai import OpenAIModel
from locus.reasoning.gsar_evaluator import GSAREvaluator
from locus.reasoning.gsar_judge import JudgeOutput, StructuredOutputGSARJudge

judge = StructuredOutputGSARJudge(
model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048),
)

# Specific factual claims, no supporting evidence at all. Judge
# cannot ground these no matter how many times we re-call it.
report = (
"The outage was caused by a failed power supply unit in rack 7B. "
"The PSU was replaced at 03:15 UTC by technician T-218. "
"Customer-facing latency returned to baseline within 8 minutes."
)
evidence = "(no evidence available)\n"

async def regen(syn: str, jo: JudgeOutput) -> str:
return syn

async def replan(syn: str, ev: str, jo: JudgeOutput) -> tuple[str, str]:
# Deliberately do NOT add evidence — simulates a failed plan.
return syn, ev

evaluator = GSAREvaluator(
judge=judge,
regenerate_fn=regen,
replan_fn=replan,
k_max=2,
)
result = await evaluator.evaluate(report_synthesis=report, evidence_corpus=evidence)

# The loop must terminate without hanging.
assert result.degraded is True, (
f"expected degraded=True after K_max replans without recovery, "
f"got final={result.final_decision}, replans={result.replans_used}"
)
assert result.replans_used == 2
# Three judge calls total: iteration 0 + 2 post-replan judges.
assert len(result.trajectory) == 3


@skip_without_openai
@pytest.mark.asyncio
async def test_gsar_rho_zero_inflation_visible_live() -> None:
"""Property P5 in practice: dropping ρ inflates ``S`` on a real
judge-produced partition.

The §8.5 ablation. Run the judge once, then score the same
partition twice — with default ρ=0.5 and with ρ=0. The ρ=0 score
must be ≥ the ρ=0.5 score. The contradiction-non-suppression
property is what prevents adversarial summarisers from boosting
their score by silently dropping refuted claims.
"""
from locus.models.native.openai import OpenAIModel
from locus.reasoning.gsar import gsar_score
from locus.reasoning.gsar_judge import StructuredOutputGSARJudge

judge = StructuredOutputGSARJudge(
model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048),
)

# Mixed report: most claims grounded, but one factually wrong claim
# that the judge should partition into contradicted.
report = (
"CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. "
"Request rate held steady at 4500 RPS throughout the spike. "
"An alert fired at 14:02 UTC."
)
evidence = (
"[tool=query_metrics row=14:02:01] host=db-prod-1 cpu_pct=97.2\n"
"[tool=query_metrics row=14:02:01] host=db-prod-1 rps=12.4\n"
"[signal] alert_id=A-9912 fired_at=14:02:00 metric=cpu_pct severity=high\n"
)

out = await judge.judge(report_synthesis=report, evidence_corpus=evidence)
partition = out.to_partition()

# Pre-condition for the test to be meaningful: the judge identified
# at least one contradicted claim. The "rps held steady at 4500"
# statement directly conflicts with the tool output (12.4 RPS).
if not partition.contradicted:
pytest.skip(
"judge did not produce a contradicted claim on this run; "
"P5 ablation requires W(X) > 0 to be observable. "
f"partition={[(b, len(getattr(partition, b))) for b in ('grounded', 'ungrounded', 'contradicted', 'complementary')]}"
)

s_default = gsar_score(partition, contradiction_penalty=0.5)
s_no_rho = gsar_score(partition, contradiction_penalty=0.0)

assert s_no_rho >= s_default - 1e-9, (
f"ρ=0 produced lower score than ρ=0.5: s_default={s_default:.4f}, s_no_rho={s_no_rho:.4f}"
)
assert s_no_rho > s_default, (
f"ρ=0 should strictly inflate when there's contradicted mass — "
f"s_default={s_default:.4f}, s_no_rho={s_no_rho:.4f}, "
f"|X|={len(partition.contradicted)}"
)


@skip_without_openai
@pytest.mark.asyncio
async def test_gsar_cross_judge_decision_agreement() -> None:
"""Two different OpenAI judges should agree on δ for clear inputs.

Paper §11 / Table 10 claim: the C₃ contradiction-penalty effect is
judge-agnostic. We exercise a weaker but cheaper version — for a
clearly-grounded report and a clearly-ungrounded report, two
different judge models should land in the same decision tier
under the reference thresholds.
"""
from locus.models.native.openai import OpenAIModel
from locus.reasoning.gsar import Decision, decide, gsar_score
from locus.reasoning.gsar_judge import StructuredOutputGSARJudge

j_mini = StructuredOutputGSARJudge(
model=OpenAIModel(model="gpt-4o-mini", max_tokens=2048),
)
j_full = StructuredOutputGSARJudge(
model=OpenAIModel(model="gpt-4o", max_tokens=2048),
)

grounded = {
"report": (
"CPU utilisation on host db-prod-1 reached 97% at 14:02 UTC. "
"Request rate dropped to 12 RPS at the same time."
),
"evidence": (
"[tool=query_metrics row=14:02:01] host=db-prod-1 cpu_pct=97.2\n"
"[tool=query_metrics row=14:02:01] host=db-prod-1 rps=12.4\n"
),
}
ungrounded = {
"report": ("The outage was caused by a failed power supply at 03:15 UTC."),
"evidence": "[signal] alert_id=A-1042 fired_at=02:48:12 metric=availability\n",
}

async def decision_for(judge, payload: dict[str, str]) -> Decision:
out = await judge.judge(
report_synthesis=payload["report"],
evidence_corpus=payload["evidence"],
)
if out.abstained:
return Decision.ABSTAIN
score = gsar_score(out.to_partition())
return decide(score)

d_mini_g = await decision_for(j_mini, grounded)
d_full_g = await decision_for(j_full, grounded)
d_mini_u = await decision_for(j_mini, ungrounded)
d_full_u = await decision_for(j_full, ungrounded)

# Grounded report: both judges should not land in `replan`. We allow
# `regenerate` because gpt-4o-mini occasionally over-flags an
# inference; the cheap-recovery tier is correct in that case.
assert d_mini_g != Decision.REPLAN, f"gpt-4o-mini sent grounded → replan: {d_mini_g}"
assert d_full_g != Decision.REPLAN, f"gpt-4o sent grounded → replan: {d_full_g}"
# Ungrounded report: both judges should not land in `proceed`.
assert d_mini_u != Decision.PROCEED, f"gpt-4o-mini sent ungrounded → proceed: {d_mini_u}"
assert d_full_u != Decision.PROCEED, f"gpt-4o sent ungrounded → proceed: {d_full_u}"