From 3514f0375bd28171ef93768fa336f649ac10211a Mon Sep 17 00:00:00 2001 From: Alan Buscaglia Date: Sun, 14 Jun 2026 11:53:31 +0200 Subject: [PATCH 1/4] fix(store): backfill relation sync mutations so relations reach cloud Relations in memory_relations could lack a sync_mutations row and never replicate to Engram Cloud (journal-based), silently. Cloud-journal sibling of #353 (local chunk path). - Add backfillRelationSyncMutationsTx mirroring the observation backfill (two-phase collect-then-insert, non-orphaned, NOT EXISTS guard). - Call it from backfillProjectSyncMutationsTx (startup repair, enroll, rename, merge). - Count non-orphaned relations missing a mutation in projectNeedsBackfill. - Enqueue from JudgeBySemantic on the enrolled path (mem_compare/ScanProject verdicts previously never enqueued). Closes #496 --- internal/mcp/mcp_conflict_loop_test.go | 44 ++- internal/store/relation_backfill_test.go | 427 +++++++++++++++++++++++ internal/store/relations.go | 49 ++- internal/store/store.go | 110 +++++- 4 files changed, 607 insertions(+), 23 deletions(-) create mode 100644 internal/store/relation_backfill_test.go diff --git a/internal/mcp/mcp_conflict_loop_test.go b/internal/mcp/mcp_conflict_loop_test.go index 4ce7b468..d29ec7a5 100644 --- a/internal/mcp/mcp_conflict_loop_test.go +++ b/internal/mcp/mcp_conflict_loop_test.go @@ -421,15 +421,21 @@ func TestConflictLoop_Orphaning(t *testing.T) { } } -// ─── G.4 — Sync regression test ───────────────────────────────────────────── +// ─── G.4 — Sync enrollment-gate regression test ───────────────────────────── // -// Verifies that memory_relations rows are NOT inserted into sync_mutations -// (REQ-009: local-only relations in Phase 1). This is the regression guard. +// Verifies that an UNENROLLED project never enqueues relation sync mutations. +// The enqueue paths in JudgeRelation and JudgeBySemantic are guarded by an +// enrollment check — this test is the regression guard for that gate. // -// The assertion is entity-level, not count-level: no row in sync_mutations -// should have entity = 'memory_relations' or 'relation' after any conflict -// operation. Sessions and observations still produce their own sync mutations -// as expected. +// Note: relation cloud sync IS intentional for enrolled projects (#313/#379/ +// #383 enabled it; #496 extends it with backfill). This test uses an unenrolled +// store (newMCPTestStore does not call EnrollProject), so the relation count +// must remain zero — not because relations are local-only, but because the +// enrollment gate must hold. +// +// The assertion is entity-level: no row in sync_mutations should have +// entity = 'relation' after conflict operations on an unenrolled project. +// Sessions and observations still produce their own sync mutations as expected. func TestConflictLoop_SyncRegression(t *testing.T) { s := newMCPTestStore(t) saveH := handleSave(s, MCPConfig{}, NewSessionActivity(10*time.Minute)) @@ -479,13 +485,14 @@ func TestConflictLoop_SyncRegression(t *testing.T) { t.Fatalf("G.4 mem_judge: err=%v isError=%v", err, judgeRes.IsError) } } else { - t.Logf("G.4: no candidates returned after save B (FTS similarity below floor); judge step skipped — relation-entity assertion still covers REQ-009") + t.Logf("G.4: no candidates returned after save B (FTS similarity below floor); judge step skipped — enrollment-gate assertion still covers the unenrolled guard") } - // ── Step 3: Assert no 'memory_relations' entity in sync_mutations. ──────── - // REQ-009: memory_relations must not appear in the sync wire at all. - // Sessions ('session') and observations ('observation') are expected; - // any other entity type for relations would be a violation. + // ── Step 3: Assert no relation entity in sync_mutations (enrollment gate). ── + // This store is UNENROLLED — the enrollment gate in JudgeRelation / + // JudgeBySemantic must prevent any relation sync_mutations row from being + // written. Sessions ('session') and observations ('observation') are expected; + // a relation-entity row here would mean the enrollment guard was bypassed. assertNoRelationSyncMutations(t, s) // ── Step 4: Verify observation sync payloads exclude decay fields. ──────── @@ -619,9 +626,14 @@ func parseEnvelope(t *testing.T, label string, res *mcppkg.CallToolResult) map[s } // assertNoRelationSyncMutations verifies that NO sync_mutations rows reference -// relation entities. REQ-009: memory_relations are local-only in Phase 1. -// Sessions ('session') and observations ('observation') are expected entities; -// any relation-entity row would be a regression. +// relation entities on an UNENROLLED store. The enrollment gate in +// JudgeRelation / JudgeBySemantic must prevent any relation row from being +// written when the project is not enrolled. Sessions ('session') and +// observations ('observation') are expected entities; a relation-entity row +// in an unenrolled context means the enrollment guard was bypassed. +// +// Note: relation sync mutations ARE valid for enrolled projects (#313/#379/ +// #383/#496). This helper is intentionally scoped to the unenrolled test in G.4. func assertNoRelationSyncMutations(t *testing.T, s *store.Store) { t.Helper() count, err := s.CountRelationSyncMutations() @@ -629,7 +641,7 @@ func assertNoRelationSyncMutations(t *testing.T, s *store.Store) { t.Fatalf("G.4 CountRelationSyncMutations: %v", err) } if count > 0 { - t.Errorf("G.4 REQ-009 violated: found %d sync_mutations row(s) with relation entity — memory_relations must NOT sync in Phase 1", count) + t.Errorf("G.4 enrollment-gate violated: found %d sync_mutations row(s) with relation entity on unenrolled project — enrollment gate must prevent relation sync mutations", count) } } diff --git a/internal/store/relation_backfill_test.go b/internal/store/relation_backfill_test.go new file mode 100644 index 00000000..91a8eea2 --- /dev/null +++ b/internal/store/relation_backfill_test.go @@ -0,0 +1,427 @@ +package store + +// Tests for cloud-relation-backfill (#496). +// +// Three behaviors verified: +// 1. backfillRelationSyncMutationsTx: non-orphaned relations missing a +// sync_mutations row get one created; orphaned relations are skipped. +// 2. JudgeBySemantic: enrolled project → enqueues; non-enrolled → does not. +// 3. projectNeedsBackfill: returns true when a relation lacks its mutation row. + +import ( + "database/sql" + "testing" +) + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +// setupBackfillStore creates a store with: +// - session "ses-bf" / project "proj-bf" +// - two observations (srcSyncID, tgtSyncID) +// - project enrolled in sync_enrolled_projects +// +// Returns the store plus sync_ids of the two observations. +func setupBackfillStore(t *testing.T) (s *Store, srcSyncID, tgtSyncID string) { + t.Helper() + s = newTestStore(t) + if err := s.CreateSession("ses-bf", "proj-bf", "/tmp/bf"); err != nil { + t.Fatalf("CreateSession: %v", err) + } + if err := s.EnrollProject("proj-bf"); err != nil { + t.Fatalf("EnrollProject: %v", err) + } + _, srcSyncID = addTestObsSession(t, s, "ses-bf", "Backfill source obs", "decision", "proj-bf", "project") + _, tgtSyncID = addTestObsSession(t, s, "ses-bf", "Backfill target obs", "decision", "proj-bf", "project") + return +} + +// insertRelationDirect inserts a memory_relations row bypassing the normal +// SaveRelation / JudgeRelation path so there is no corresponding sync_mutations +// row. Used to simulate the pre-backfill gap. +func insertRelationDirect(t *testing.T, s *Store, syncID, sourceID, targetID, judgmentStatus string) { + t.Helper() + if _, err := s.db.Exec(` + INSERT INTO memory_relations + (sync_id, source_id, target_id, relation, judgment_status, created_at, updated_at) + VALUES (?, ?, ?, 'related', ?, datetime('now'), datetime('now')) + `, syncID, sourceID, targetID, judgmentStatus); err != nil { + t.Fatalf("insertRelationDirect: %v", err) + } +} + +// countRelationSyncMutationsByKey returns the count of sync_mutations rows for a +// specific relation entity_key. +func countRelationSyncMutationsByKey(t *testing.T, s *Store, relSyncID string) int { + t.Helper() + var n int + if err := s.db.QueryRow( + `SELECT count(*) FROM sync_mutations WHERE entity = ? AND entity_key = ? AND source = ?`, + SyncEntityRelation, relSyncID, SyncSourceLocal, + ).Scan(&n); err != nil { + t.Fatalf("countRelationSyncMutationsByKey(%q): %v", relSyncID, err) + } + return n +} + +// ─── Test 1: backfillRelationSyncMutationsTx ───────────────────────────────── + +// TestBackfillRelationSyncMutations_CreatesRowForNonOrphaned verifies that a +// non-orphaned relation that already exists in memory_relations with NO +// sync_mutations row gets a sync_mutations row after backfill runs. +// +// This tests gap #1 from issue #496: backfillProjectSyncMutationsTx calls no +// relation backfill, so pre-existing relations never replicate to the cloud. +func TestBackfillRelationSyncMutations_CreatesRowForNonOrphaned(t *testing.T) { + s, srcSyncID, tgtSyncID := setupBackfillStore(t) + + // Insert a judged relation directly — no sync_mutations row exists yet. + relSyncID := newSyncID("rel-bf-judged") + insertRelationDirect(t, s, relSyncID, srcSyncID, tgtSyncID, JudgmentStatusJudged) + + // Precondition: zero mutation rows for this relation. + if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 0 { + t.Fatalf("precondition: expected 0 sync_mutations for relation, got %d", n) + } + + // Run backfill through the public entry point (same path used on startup). + if err := s.repairEnrolledProjectSyncMutations(); err != nil { + t.Fatalf("repairEnrolledProjectSyncMutations: %v", err) + } + + // Postcondition: one sync_mutations row must exist now. + if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 1 { + t.Errorf("expected 1 sync_mutations row after backfill, got %d", n) + } +} + +// TestBackfillRelationSyncMutations_SkipsOrphaned verifies that orphaned +// relations are NOT backfilled — their status signals the endpoints are gone, +// so syncing them would produce a useless cloud row. +func TestBackfillRelationSyncMutations_SkipsOrphaned(t *testing.T) { + s, srcSyncID, tgtSyncID := setupBackfillStore(t) + + // Insert an orphaned relation directly. + relOrphanedSyncID := newSyncID("rel-bf-orphaned") + insertRelationDirect(t, s, relOrphanedSyncID, srcSyncID, tgtSyncID, JudgmentStatusOrphaned) + + // Run backfill. + if err := s.repairEnrolledProjectSyncMutations(); err != nil { + t.Fatalf("repairEnrolledProjectSyncMutations: %v", err) + } + + // Orphaned relation must NOT have a sync_mutations row. + if n := countRelationSyncMutationsByKey(t, s, relOrphanedSyncID); n != 0 { + t.Errorf("orphaned relation must NOT be backfilled, got %d sync_mutations rows", n) + } +} + +// TestBackfillRelationSyncMutations_SkipsAlreadyEnqueued verifies idempotency: +// running backfill again when a mutation already exists does not create a +// duplicate. +func TestBackfillRelationSyncMutations_SkipsAlreadyEnqueued(t *testing.T) { + s, srcSyncID, tgtSyncID := setupBackfillStore(t) + + relSyncID := newSyncID("rel-bf-already") + insertRelationDirect(t, s, relSyncID, srcSyncID, tgtSyncID, JudgmentStatusJudged) + + // Run backfill twice. + if err := s.repairEnrolledProjectSyncMutations(); err != nil { + t.Fatalf("first repairEnrolledProjectSyncMutations: %v", err) + } + if err := s.repairEnrolledProjectSyncMutations(); err != nil { + t.Fatalf("second repairEnrolledProjectSyncMutations: %v", err) + } + + // Must still be exactly one row — idempotent. + if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 1 { + t.Errorf("expected exactly 1 sync_mutations row after two backfill runs, got %d", n) + } +} + +// ─── Test 2: JudgeBySemantic enqueue ───────────────────────────────────────── + +// TestJudgeBySemantic_EnqueuesSyncMutation_WhenEnrolled verifies that calling +// JudgeBySemantic on an enrolled project produces a sync_mutations row for the +// resulting relation. +// +// This tests gap #2 from issue #496: JudgeBySemantic never called +// enqueueSyncMutationTx, so every semantic verdict produced no journal row. +func TestJudgeBySemantic_EnqueuesSyncMutation_WhenEnrolled(t *testing.T) { + s, srcSyncID, tgtSyncID := setupBackfillStore(t) + + before := countRelationMutations(t, s, SyncEntityRelation, "proj-bf") + + syncID, err := s.JudgeBySemantic(JudgeBySemanticParams{ + SourceID: srcSyncID, + TargetID: tgtSyncID, + Relation: RelationConflictsWith, + Reasoning: "they conflict semantically", + Model: "test-model", + }) + if err != nil { + t.Fatalf("JudgeBySemantic: %v", err) + } + if syncID == "" { + t.Fatal("JudgeBySemantic: expected non-empty syncID") + } + + after := countRelationMutations(t, s, SyncEntityRelation, "proj-bf") + if after <= before { + t.Errorf("expected sync_mutations to gain a row after JudgeBySemantic on enrolled project; before=%d after=%d", before, after) + } + + // Verify the mutation references the correct relation sync_id. + if n := countRelationSyncMutationsByKey(t, s, syncID); n != 1 { + t.Errorf("expected 1 sync_mutations row for relation %q, got %d", syncID, n) + } +} + +// TestJudgeBySemantic_DoesNotEnqueue_WhenNotEnrolled verifies that calling +// JudgeBySemantic on a non-enrolled project does NOT produce a sync_mutations +// row (enrollment guard — backfill covers it post-enrollment). +func TestJudgeBySemantic_DoesNotEnqueue_WhenNotEnrolled(t *testing.T) { + s := newTestStore(t) + if err := s.CreateSession("ses-unenrolled", "proj-unenrolled", "/tmp/unenrolled"); err != nil { + t.Fatalf("CreateSession: %v", err) + } + _, srcID := addTestObsSession(t, s, "ses-unenrolled", "Source unenrolled", "decision", "proj-unenrolled", "project") + _, tgtID := addTestObsSession(t, s, "ses-unenrolled", "Target unenrolled", "decision", "proj-unenrolled", "project") + + syncID, err := s.JudgeBySemantic(JudgeBySemanticParams{ + SourceID: srcID, + TargetID: tgtID, + Relation: RelationRelated, + Reasoning: "semantically related", + Model: "test-model", + }) + if err != nil { + t.Fatalf("JudgeBySemantic: %v", err) + } + if syncID == "" { + t.Fatal("JudgeBySemantic: expected non-empty syncID even for unenrolled project") + } + + // No mutation must exist for the unenrolled project. + if n := countRelationSyncMutationsByKey(t, s, syncID); n != 0 { + t.Errorf("unenrolled project: expected 0 sync_mutations rows for relation, got %d", n) + } +} + +// TestJudgeBySemantic_UpdateEnqueues_WhenEnrolled verifies that updating an +// existing relation via JudgeBySemantic (UPSERT path) also enqueues a +// sync_mutations row. +func TestJudgeBySemantic_UpdateEnqueues_WhenEnrolled(t *testing.T) { + s, srcSyncID, tgtSyncID := setupBackfillStore(t) + + // First call: insert. + syncID, err := s.JudgeBySemantic(JudgeBySemanticParams{ + SourceID: srcSyncID, + TargetID: tgtSyncID, + Relation: RelationRelated, + Reasoning: "initial verdict", + Model: "model-v1", + }) + if err != nil { + t.Fatalf("JudgeBySemantic insert: %v", err) + } + + beforeUpdate := countRelationMutations(t, s, SyncEntityRelation, "proj-bf") + + // Second call: update (same pair, different relation). + syncID2, err := s.JudgeBySemantic(JudgeBySemanticParams{ + SourceID: srcSyncID, + TargetID: tgtSyncID, + Relation: RelationConflictsWith, + Reasoning: "revised verdict — conflicts", + Model: "model-v2", + }) + if err != nil { + t.Fatalf("JudgeBySemantic update: %v", err) + } + // Same pair → same canonical sync_id returned. + if syncID2 != syncID { + t.Errorf("UPSERT should return same sync_id; want %q, got %q", syncID, syncID2) + } + + afterUpdate := countRelationMutations(t, s, SyncEntityRelation, "proj-bf") + if afterUpdate <= beforeUpdate { + t.Errorf("expected additional sync_mutations row after JudgeBySemantic update; before=%d after=%d", beforeUpdate, afterUpdate) + } +} + +// ─── Test 3: projectNeedsBackfill detects missing relation mutations ────────── + +// TestProjectNeedsBackfill_TrueWhenRelationMissingMutation verifies that +// projectNeedsBackfill returns true when a relation exists in memory_relations +// with no corresponding sync_mutations row. +// +// This tests gap #3 from issue #496: the fast-path skip in +// repairEnrolledProjectSyncMutations would silently skip projects that only +// have unsynced relations (observations were already backfilled). +func TestProjectNeedsBackfill_TrueWhenRelationMissingMutation(t *testing.T) { + s := newTestStore(t) + if err := s.CreateSession("ses-bf2", "proj-bf2", "/tmp/bf2"); err != nil { + t.Fatalf("CreateSession: %v", err) + } + + _, src2 := addTestObsSession(t, s, "ses-bf2", "NeedsBF src", "decision", "proj-bf2", "project") + _, tgt2 := addTestObsSession(t, s, "ses-bf2", "NeedsBF tgt", "decision", "proj-bf2", "project") + + // Enroll — this creates sync_mutations for session + observations. + if err := s.EnrollProject("proj-bf2"); err != nil { + t.Fatalf("EnrollProject: %v", err) + } + + // Now insert a relation WITHOUT a sync_mutations row (simulates the gap). + relSyncID := newSyncID("rel-needs-bf") + insertRelationDirect(t, s, relSyncID, src2, tgt2, JudgmentStatusJudged) + + // projectNeedsBackfill must return true because the relation lacks a mutation. + needs, err := s.projectNeedsBackfill("proj-bf2") + if err != nil { + t.Fatalf("projectNeedsBackfill after relation insert: %v", err) + } + if !needs { + t.Errorf("projectNeedsBackfill must return true when a non-orphaned relation has no sync_mutations row, got false") + } +} + +// TestProjectNeedsBackfill_FalseWhenRelationHasMutation verifies that +// projectNeedsBackfill returns false when all relations already have +// sync_mutations rows (and sessions/observations are also covered). +func TestProjectNeedsBackfill_FalseWhenRelationHasMutation(t *testing.T) { + s, srcSyncID, tgtSyncID := setupBackfillStore(t) + + // JudgeRelation — enrolled, so it enqueues a mutation automatically. + relSyncID := newSyncID("rel-has-mut") + if _, err := s.SaveRelation(SaveRelationParams{ + SyncID: relSyncID, + SourceID: srcSyncID, + TargetID: tgtSyncID, + }); err != nil { + t.Fatalf("SaveRelation: %v", err) + } + if _, err := s.JudgeRelation(JudgeRelationParams{ + JudgmentID: relSyncID, + Relation: RelationConflictsWith, + MarkedByActor: "test-actor", + MarkedByKind: "agent", + }); err != nil { + t.Fatalf("JudgeRelation: %v", err) + } + + // projectNeedsBackfill should return false: relation already has a mutation. + needs, err := s.projectNeedsBackfill("proj-bf") + if err != nil { + t.Fatalf("projectNeedsBackfill: %v", err) + } + if needs { + t.Errorf("expected projectNeedsBackfill=false when relation already has a sync_mutations row, got true") + } +} + +// TestProjectNeedsBackfill_OrphanedRelationDoesNotTrigger verifies that an +// orphaned relation (without a sync_mutations row) does NOT cause +// projectNeedsBackfill to return true — orphaned relations are intentionally +// excluded from sync. +func TestProjectNeedsBackfill_OrphanedRelationDoesNotTrigger(t *testing.T) { + // Create a clean store with only enrolled session/obs/relation mutations + // satisfied, then add an orphaned relation. + s2 := newTestStore(t) + if err := s2.CreateSession("ses-orph-bf", "proj-orph", "/tmp/orph"); err != nil { + t.Fatalf("CreateSession: %v", err) + } + _, src := addTestObsSession(t, s2, "ses-orph-bf", "Orphan src", "decision", "proj-orph", "project") + _, tgt := addTestObsSession(t, s2, "ses-orph-bf", "Orphan tgt", "decision", "proj-orph", "project") + + if err := s2.EnrollProject("proj-orph"); err != nil { + t.Fatalf("EnrollProject: %v", err) + } + + // Insert an orphaned relation (no sync_mutations row for it). + orphRelSyncID := newSyncID("rel-orph-check") + insertRelationDirect(t, s2, orphRelSyncID, src, tgt, JudgmentStatusOrphaned) + + // projectNeedsBackfill must NOT be triggered by the orphaned relation. + // (It may still be true because of the sessions/obs from EnrollProject + // — but the orphaned relation itself must not contribute.) + // Verify the relation-specific count is zero. + if err := s2.withTx(func(tx *sql.Tx) error { + // Manually confirm orphaned relation is excluded from the backfill query. + var n int + err := tx.QueryRow(` + SELECT COUNT(*) FROM memory_relations r + JOIN observations src ON src.sync_id = r.source_id AND src.deleted_at IS NULL + JOIN observations tgt ON tgt.sync_id = r.target_id AND tgt.deleted_at IS NULL + WHERE r.judgment_status != ? + AND NOT EXISTS ( + SELECT 1 FROM sync_mutations sm + WHERE sm.target_key = ? AND sm.entity = ? AND sm.entity_key = r.sync_id AND sm.source = ? + ) + `, JudgmentStatusOrphaned, DefaultSyncTargetKey, SyncEntityRelation, SyncSourceLocal).Scan(&n) + if err != nil { + return err + } + if n != 0 { + t.Errorf("orphaned relation must NOT be counted in backfill check, got count=%d", n) + } + return nil + }); err != nil { + t.Fatalf("withTx: %v", err) + } +} + +// ─── Test 4: pre-enrollment relations are backfilled on EnrollProject ───────── + +// TestEnrollProject_BackfillsPreExistingRelations verifies the core #496 +// trigger end-to-end through the real code path: +// +// 1. Create a session and two observations on an UNENROLLED store. +// 2. Call JudgeBySemantic — enrollment gate prevents any sync_mutations row. +// 3. Call EnrollProject — backfillProjectSyncMutationsTx runs, which calls +// backfillRelationSyncMutationsTx internally. +// 4. Assert the relation NOW has a sync_mutations row (backfill succeeded). +// +// This proves that relations created before enrollment are replicated when the +// project is later enrolled. +func TestEnrollProject_BackfillsPreExistingRelations(t *testing.T) { + s := newTestStore(t) + if err := s.CreateSession("ses-enroll-bf", "proj-enroll-bf", "/tmp/enroll-bf"); err != nil { + t.Fatalf("CreateSession: %v", err) + } + + _, srcSyncID := addTestObsSession(t, s, "ses-enroll-bf", "Pre-enroll src obs", "decision", "proj-enroll-bf", "project") + _, tgtSyncID := addTestObsSession(t, s, "ses-enroll-bf", "Pre-enroll tgt obs", "decision", "proj-enroll-bf", "project") + + // Create a relation via the real JudgeBySemantic path on an UNENROLLED project. + // The enrollment gate must prevent any sync_mutations row from being written. + relSyncID, err := s.JudgeBySemantic(JudgeBySemanticParams{ + SourceID: srcSyncID, + TargetID: tgtSyncID, + Relation: RelationRelated, + Reasoning: "pre-enrollment semantic verdict", + Model: "test-model", + }) + if err != nil { + t.Fatalf("JudgeBySemantic (unenrolled): %v", err) + } + if relSyncID == "" { + t.Fatal("JudgeBySemantic: expected non-empty relSyncID") + } + + // Pre-enrollment: enrollment gate must have prevented any relation mutation. + if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 0 { + t.Fatalf("precondition: expected 0 sync_mutations for relation before enrollment, got %d", n) + } + + // Enroll the project — this triggers backfillProjectSyncMutationsTx, which + // calls backfillRelationSyncMutationsTx to cover the pre-existing relation. + if err := s.EnrollProject("proj-enroll-bf"); err != nil { + t.Fatalf("EnrollProject: %v", err) + } + + // Post-enrollment: the relation must now have a sync_mutations row. + if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 1 { + t.Errorf("expected 1 sync_mutations row for relation after EnrollProject backfill, got %d", n) + } +} diff --git a/internal/store/relations.go b/internal/store/relations.go index 1adb6ad1..f099ae08 100644 --- a/internal/store/relations.go +++ b/internal/store/relations.go @@ -815,7 +815,54 @@ func (s *Store) JudgeBySemantic(p JudgeBySemanticParams) (string, error) { } resultSyncID = existingSyncID - return nil + + // ── Enqueue sync mutation when project is enrolled ───────────────────── + // Derive source project for the enrollment check (mirrors JudgeRelation). + var srcProject, tgtProject string + _ = tx.QueryRow( + `SELECT ifnull(project,'') FROM observations WHERE sync_id = ?`, p.SourceID, + ).Scan(&srcProject) + _ = tx.QueryRow( + `SELECT ifnull(project,'') FROM observations WHERE sync_id = ?`, p.TargetID, + ).Scan(&tgtProject) + + enrollCheckProject := srcProject + if enrollCheckProject == "" { + enrollCheckProject = tgtProject + } + var enrolled int + if err := tx.QueryRow( + `SELECT 1 FROM sync_enrolled_projects WHERE project = ? LIMIT 1`, enrollCheckProject, + ).Scan(&enrolled); err != nil && err != sql.ErrNoRows { + return fmt.Errorf("JudgeBySemantic: check enrollment: %w", err) + } + if enrolled == 0 { + return nil // not enrolled — backfill will cover it on enrollment + } + + // Build payload from the freshly-written row. + rel, err := s.getRelationTx(tx, existingSyncID) + if err != nil { + return fmt.Errorf("JudgeBySemantic: read relation for enqueue: %w", err) + } + payload := syncRelationPayload{ + SyncID: rel.SyncID, + SourceID: rel.SourceID, + TargetID: rel.TargetID, + Relation: rel.Relation, + Reason: rel.Reason, + Evidence: rel.Evidence, + Confidence: rel.Confidence, + JudgmentStatus: rel.JudgmentStatus, + MarkedByActor: rel.MarkedByActor, + MarkedByKind: rel.MarkedByKind, + MarkedByModel: rel.MarkedByModel, + SessionID: rel.SessionID, + Project: srcProject, + CreatedAt: rel.CreatedAt, + UpdatedAt: rel.UpdatedAt, + } + return s.enqueueSyncMutationTx(tx, SyncEntityRelation, rel.SyncID, SyncOpUpsert, payload) }); err != nil { return "", err } diff --git a/internal/store/store.go b/internal/store/store.go index 7084d668..0141542e 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -5000,7 +5000,10 @@ func (s *Store) backfillProjectSyncMutationsTx(tx *sql.Tx, project string) error if err := s.backfillObservationSyncMutationsTx(tx, project); err != nil { return err } - return s.backfillPromptSyncMutationsTx(tx, project) + if err := s.backfillPromptSyncMutationsTx(tx, project); err != nil { + return err + } + return s.backfillRelationSyncMutationsTx(tx, project) } // projectNeedsBackfill returns true when a project has any sessions, live observations, @@ -5042,6 +5045,23 @@ func (s *Store) projectNeedsBackfill(project string) (bool, error) { )`, args: []any{project, project, DefaultSyncTargetKey, SyncEntityPrompt, SyncSourceLocal}, }, + { + // Count non-orphaned relations whose source and target observations are + // locally available and that have no local upsert sync_mutations row. + // Mirrors the SELECT in backfillRelationSyncMutationsTx. + q: `SELECT COUNT(*) + FROM memory_relations r + JOIN observations src ON src.sync_id = r.source_id AND src.deleted_at IS NULL + JOIN observations tgt ON tgt.sync_id = r.target_id AND tgt.deleted_at IS NULL + LEFT JOIN sessions src_s ON src_s.id = src.session_id + WHERE r.judgment_status != ? + AND coalesce(nullif(src.project, ''), src_s.project, '') = ? + AND NOT EXISTS ( + SELECT 1 FROM sync_mutations sm + WHERE sm.target_key = ? AND sm.entity = ? AND sm.entity_key = r.sync_id AND sm.source = ? + )`, + args: []any{JudgmentStatusOrphaned, project, DefaultSyncTargetKey, SyncEntityRelation, SyncSourceLocal}, + }, } for _, cq := range queries { var n int @@ -5361,6 +5381,80 @@ func (s *Store) backfillPromptSyncMutationsTx(tx *sql.Tx, project string) error return nil } +// backfillRelationSyncMutationsTx creates sync_mutations rows for non-orphaned +// relations that have no corresponding local sync_mutations row. +// +// This fills the cloud-journal gap described in issue #496: a relation can exist +// in memory_relations with no sync_mutations row and therefore never replicates. +// +// Design mirrors backfillObservationSyncMutationsTx exactly: +// - Phase 1: collect all missing rows into a slice (close cursor first). +// - Phase 2: insert, avoiding the SQLite cursor-open-during-write busy loop. +// +// The SELECT mirrors ExportRelationMutations' join/orphan-filter structure +// (join both observations, exclude orphaned status, exclude rows that already +// have a local upsert mutation), but scopes by source-observation project only. +// ExportRelationMutations additionally filters by tgt.project; the backfill +// intentionally omits that filter to avoid skipping cross-project edges where +// only the source belongs to this project. +func (s *Store) backfillRelationSyncMutationsTx(tx *sql.Tx, project string) error { + rows, err := s.queryItHook(tx, ` + SELECT r.sync_id, r.source_id, r.target_id, r.relation, r.reason, r.evidence, r.confidence, + r.judgment_status, r.marked_by_actor, r.marked_by_kind, r.marked_by_model, + r.session_id, + coalesce(nullif(src.project, ''), src_s.project, ''), + r.created_at, r.updated_at + FROM memory_relations r + JOIN observations src ON src.sync_id = r.source_id AND src.deleted_at IS NULL + JOIN observations tgt ON tgt.sync_id = r.target_id AND tgt.deleted_at IS NULL + LEFT JOIN sessions src_s ON src_s.id = src.session_id + WHERE r.judgment_status != ? + AND coalesce(nullif(src.project, ''), src_s.project, '') = ? + AND NOT EXISTS ( + SELECT 1 FROM sync_mutations sm + WHERE sm.target_key = ? + AND sm.entity = ? + AND sm.entity_key = r.sync_id + AND sm.source = ? + ) + ORDER BY r.created_at ASC, r.sync_id ASC`, + JudgmentStatusOrphaned, + project, + DefaultSyncTargetKey, SyncEntityRelation, SyncSourceLocal, + ) + if err != nil { + return err + } + + // Phase 1: collect into memory before any INSERT to avoid cursor-open-during-write. + var pending []syncRelationPayload + for rows.Next() { + var p syncRelationPayload + if err := rows.Scan( + &p.SyncID, &p.SourceID, &p.TargetID, &p.Relation, &p.Reason, &p.Evidence, &p.Confidence, + &p.JudgmentStatus, &p.MarkedByActor, &p.MarkedByKind, &p.MarkedByModel, + &p.SessionID, &p.Project, &p.CreatedAt, &p.UpdatedAt, + ); err != nil { + return closeRowsWithError(rows, err) + } + pending = append(pending, p) + } + if err := rows.Close(); err != nil { + return err + } + if err := rows.Err(); err != nil { + return err + } + + // Phase 2: insert now that the read cursor is closed. + for _, p := range pending { + if err := s.enqueueSyncMutationTx(tx, SyncEntityRelation, p.SyncID, SyncOpUpsert, p); err != nil { + return err + } + } + return nil +} + func (s *Store) enqueueSyncMutationTx(tx *sql.Tx, entity, entityKey, op string, payload any) error { encoded, err := json.Marshal(payload) if err != nil { @@ -6635,12 +6729,16 @@ func Now() string { // ─── Test-accessor helpers (REQ-009 / Phase G integration tests) ────────────── // CountRelationSyncMutations returns the number of sync_mutations rows whose -// entity is NOT 'session', 'observation', or 'prompt' — i.e., any entity that -// would indicate memory_relations data leaking into sync. Used by integration -// tests to assert that relation operations never enqueue sync mutations (REQ-009). +// entity is NOT 'session', 'observation', or 'prompt'. Used by integration +// tests to verify the enrollment gate: an UNENROLLED project must never enqueue +// relation sync mutations (the enqueue in JudgeBySemantic/JudgeRelation is +// guarded by an enrollment check). The test that calls this uses an unenrolled +// store, so the count must remain zero. // -// In Phase 1, only 'session', 'observation', and 'prompt' are valid entities. -// Any other entity (e.g. 'relation', 'memory_relation') would be a regression. +// Note: relation sync mutations ARE valid for enrolled projects (#313/#379/#383 +// enabled cloud relation sync; #496 extends it with backfill). This function +// is not a blanket "relations are local-only" check — it is an enrollment-gate +// regression guard scoped to the unenrolled test context that uses it. func (s *Store) CountRelationSyncMutations() (int, error) { var count int err := s.db.QueryRow(` From e83d2d08d5dffffbabe48e0d192815398eedc3ce Mon Sep 17 00:00:00 2001 From: Alan Buscaglia Date: Sun, 14 Jun 2026 12:14:59 +0200 Subject: [PATCH 2/4] fix(store): restrict relation backfill to judged rows and resolve project via session - Tighten backfillRelationSyncMutationsTx and projectNeedsBackfill to exclude pending/orphaned rows and any row missing marked_by_actor or marked_by_kind. Both predicates are now identical to prevent the fast-path check from desyncing with the write path. - JudgeBySemantic now derives srcProject via session-fallback (coalesce(obs.project, session.project)) matching the backfill SQL, so enqueued payloads carry a non-empty project when obs.project is blank. - Add REQ-011 WARNING log in JudgeBySemantic when srcProject is empty, matching the wording in JudgeRelation. - Update insertRelationDirect fixture usages to insertJudgedRelationDirect (with marked_by_actor/kind) for judged-relation test cases. - Add tests: pending row not backfilled, projectNeedsBackfill not triggered by pending row, JudgeBySemantic session-fallback populates payload project. --- internal/store/relation_backfill_test.go | 134 ++++++++++++++++++++++- internal/store/relations.go | 23 +++- internal/store/store.go | 28 +++-- 3 files changed, 169 insertions(+), 16 deletions(-) diff --git a/internal/store/relation_backfill_test.go b/internal/store/relation_backfill_test.go index 91a8eea2..50e77077 100644 --- a/internal/store/relation_backfill_test.go +++ b/internal/store/relation_backfill_test.go @@ -37,7 +37,8 @@ func setupBackfillStore(t *testing.T) (s *Store, srcSyncID, tgtSyncID string) { // insertRelationDirect inserts a memory_relations row bypassing the normal // SaveRelation / JudgeRelation path so there is no corresponding sync_mutations -// row. Used to simulate the pre-backfill gap. +// row. Used to simulate the pre-backfill gap for rows that legitimately lack +// marked_by_* fields (e.g. orphaned, pending). func insertRelationDirect(t *testing.T, s *Store, syncID, sourceID, targetID, judgmentStatus string) { t.Helper() if _, err := s.db.Exec(` @@ -49,6 +50,23 @@ func insertRelationDirect(t *testing.T, s *Store, syncID, sourceID, targetID, ju } } +// insertJudgedRelationDirect inserts a fully-judged memory_relations row with +// marked_by_actor and marked_by_kind populated, mirroring what JudgeRelation +// and JudgeBySemantic produce. Use this for fixtures that must be picked up +// by the backfill (the tightened predicate excludes rows without marked_by_*). +func insertJudgedRelationDirect(t *testing.T, s *Store, syncID, sourceID, targetID string) { + t.Helper() + if _, err := s.db.Exec(` + INSERT INTO memory_relations + (sync_id, source_id, target_id, relation, judgment_status, + marked_by_actor, marked_by_kind, + created_at, updated_at) + VALUES (?, ?, ?, 'related', 'judged', 'test-actor', 'agent', datetime('now'), datetime('now')) + `, syncID, sourceID, targetID); err != nil { + t.Fatalf("insertJudgedRelationDirect: %v", err) + } +} + // countRelationSyncMutationsByKey returns the count of sync_mutations rows for a // specific relation entity_key. func countRelationSyncMutationsByKey(t *testing.T, s *Store, relSyncID string) int { @@ -74,9 +92,11 @@ func countRelationSyncMutationsByKey(t *testing.T, s *Store, relSyncID string) i func TestBackfillRelationSyncMutations_CreatesRowForNonOrphaned(t *testing.T) { s, srcSyncID, tgtSyncID := setupBackfillStore(t) - // Insert a judged relation directly — no sync_mutations row exists yet. + // Insert a judged relation directly (with marked_by_* fields populated) — + // no sync_mutations row exists yet. Uses insertJudgedRelationDirect because + // the tightened backfill predicate excludes rows without marked_by fields. relSyncID := newSyncID("rel-bf-judged") - insertRelationDirect(t, s, relSyncID, srcSyncID, tgtSyncID, JudgmentStatusJudged) + insertJudgedRelationDirect(t, s, relSyncID, srcSyncID, tgtSyncID) // Precondition: zero mutation rows for this relation. if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 0 { @@ -122,7 +142,7 @@ func TestBackfillRelationSyncMutations_SkipsAlreadyEnqueued(t *testing.T) { s, srcSyncID, tgtSyncID := setupBackfillStore(t) relSyncID := newSyncID("rel-bf-already") - insertRelationDirect(t, s, relSyncID, srcSyncID, tgtSyncID, JudgmentStatusJudged) + insertJudgedRelationDirect(t, s, relSyncID, srcSyncID, tgtSyncID) // Run backfill twice. if err := s.repairEnrolledProjectSyncMutations(); err != nil { @@ -272,9 +292,11 @@ func TestProjectNeedsBackfill_TrueWhenRelationMissingMutation(t *testing.T) { t.Fatalf("EnrollProject: %v", err) } - // Now insert a relation WITHOUT a sync_mutations row (simulates the gap). + // Now insert a judged relation WITHOUT a sync_mutations row (simulates the gap). + // Must use insertJudgedRelationDirect so marked_by_* fields are populated; + // the tightened predicate in projectNeedsBackfill requires them. relSyncID := newSyncID("rel-needs-bf") - insertRelationDirect(t, s, relSyncID, src2, tgt2, JudgmentStatusJudged) + insertJudgedRelationDirect(t, s, relSyncID, src2, tgt2) // projectNeedsBackfill must return true because the relation lacks a mutation. needs, err := s.projectNeedsBackfill("proj-bf2") @@ -425,3 +447,103 @@ func TestEnrollProject_BackfillsPreExistingRelations(t *testing.T) { t.Errorf("expected 1 sync_mutations row for relation after EnrollProject backfill, got %d", n) } } + +// ─── Test 5: pending rows are not backfilled ────────────────────────────────── + +// TestBackfillRelationSyncMutations_SkipsPending verifies that a pending +// (unjudged) relation — which lacks marked_by_actor/marked_by_kind — is NOT +// selected by the backfill and does NOT cause projectNeedsBackfill to return +// true. Enqueueing pending rows would produce cloud mutations that are +// hard-rejected by server validation (HTTP 400), potentially blocking the +// entire sync batch. +func TestBackfillRelationSyncMutations_SkipsPending(t *testing.T) { + s, srcSyncID, tgtSyncID := setupBackfillStore(t) + + // Insert a pending relation without marked_by_* fields (simulates a + // FindCandidates/SaveRelation row that has not been judged yet). + pendingRelSyncID := newSyncID("rel-bf-pending") + insertRelationDirect(t, s, pendingRelSyncID, srcSyncID, tgtSyncID, JudgmentStatusPending) + + // Run backfill — the pending relation must be skipped. + if err := s.repairEnrolledProjectSyncMutations(); err != nil { + t.Fatalf("repairEnrolledProjectSyncMutations: %v", err) + } + + // Backfill must NOT have created a sync_mutations row for the pending relation. + if n := countRelationSyncMutationsByKey(t, s, pendingRelSyncID); n != 0 { + t.Errorf("pending relation must NOT be backfilled (would fail cloud validation), got %d sync_mutations rows", n) + } + + // projectNeedsBackfill must NOT return true because of the pending relation. + // (All sessions/observations were enrolled, so they are covered.) + needs, err := s.projectNeedsBackfill("proj-bf") + if err != nil { + t.Fatalf("projectNeedsBackfill: %v", err) + } + if needs { + t.Errorf("projectNeedsBackfill must NOT return true for a pending relation without marked_by_* fields") + } +} + +// ─── Test 6: JudgeBySemantic uses session-fallback for project derivation ───── + +// TestJudgeBySemantic_UsesSessionFallback_ForProject verifies that +// JudgeBySemantic derives the Project for the enqueued sync mutation via the +// session-fallback (coalesce(obs.project, session.project)), not from +// observations.project alone. +// +// When observations.project is blank but the session carries the project, the +// payload's Project field must be non-empty so cloud validation passes. +func TestJudgeBySemantic_UsesSessionFallback_ForProject(t *testing.T) { + s := newTestStore(t) + + // Create a session with a project and enroll it. + if err := s.CreateSession("ses-sf", "proj-sf", "/tmp/sf"); err != nil { + t.Fatalf("CreateSession: %v", err) + } + if err := s.EnrollProject("proj-sf"); err != nil { + t.Fatalf("EnrollProject: %v", err) + } + + // Add observations with blank project — only the session carries the project. + // This simulates observations ingested before the project column was populated. + _, srcSyncID := addTestObsSession(t, s, "ses-sf", "SF source obs", "decision", "", "project") + _, tgtSyncID := addTestObsSession(t, s, "ses-sf", "SF target obs", "decision", "", "project") + + // JudgeBySemantic must derive project via session fallback. + relSyncID, err := s.JudgeBySemantic(JudgeBySemanticParams{ + SourceID: srcSyncID, + TargetID: tgtSyncID, + Relation: RelationRelated, + Reasoning: "session-fallback test verdict", + Model: "test-model", + }) + if err != nil { + t.Fatalf("JudgeBySemantic: %v", err) + } + if relSyncID == "" { + t.Fatal("JudgeBySemantic: expected non-empty relSyncID") + } + + // A sync_mutations row must exist (project is enrolled). + if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 1 { + t.Fatalf("expected 1 sync_mutations row, got %d", n) + } + + // The enqueued payload's project must be non-empty (resolved via session). + var payloadProject string + if err := s.db.QueryRow( + `SELECT ifnull(json_extract(payload, '$.project'), '') + FROM sync_mutations + WHERE entity = ? AND entity_key = ? AND source = ?`, + SyncEntityRelation, relSyncID, SyncSourceLocal, + ).Scan(&payloadProject); err != nil { + t.Fatalf("reading payload project: %v", err) + } + if payloadProject == "" { + t.Errorf("JudgeBySemantic enqueued payload has empty project; session fallback must populate it (got %q)", payloadProject) + } + if payloadProject != "proj-sf" { + t.Errorf("expected payload project %q, got %q", "proj-sf", payloadProject) + } +} diff --git a/internal/store/relations.go b/internal/store/relations.go index f099ae08..6975694c 100644 --- a/internal/store/relations.go +++ b/internal/store/relations.go @@ -817,19 +817,36 @@ func (s *Store) JudgeBySemantic(p JudgeBySemanticParams) (string, error) { resultSyncID = existingSyncID // ── Enqueue sync mutation when project is enrolled ───────────────────── - // Derive source project for the enrollment check (mirrors JudgeRelation). + // Derive source project using the same session-fallback as the backfill + // SELECT: coalesce(nullif(obs.project,''), session.project, ''). + // This prevents an empty Project in the enqueued payload when the + // observation's own project column is blank but the session carries it. var srcProject, tgtProject string _ = tx.QueryRow( - `SELECT ifnull(project,'') FROM observations WHERE sync_id = ?`, p.SourceID, + `SELECT coalesce(nullif(o.project,''), s.project, '') + FROM observations o + LEFT JOIN sessions s ON s.id = o.session_id + WHERE o.sync_id = ?`, p.SourceID, ).Scan(&srcProject) _ = tx.QueryRow( - `SELECT ifnull(project,'') FROM observations WHERE sync_id = ?`, p.TargetID, + `SELECT coalesce(nullif(o.project,''), s.project, '') + FROM observations o + LEFT JOIN sessions s ON s.id = o.session_id + WHERE o.sync_id = ?`, p.TargetID, ).Scan(&tgtProject) enrollCheckProject := srcProject if enrollCheckProject == "" { enrollCheckProject = tgtProject } + + // REQ-011: log at WARNING level when source observation is missing locally + // (project='' race condition). The server will reject with 400; this log + // is the local breadcrumb so the gap is not silently swallowed. + if srcProject == "" { + log.Printf("[store] WARNING: JudgeBySemantic enqueueing relation %s with project='' (source observation missing locally); server will reject", existingSyncID) + } + var enrolled int if err := tx.QueryRow( `SELECT 1 FROM sync_enrolled_projects WHERE project = ? LIMIT 1`, enrollCheckProject, diff --git a/internal/store/store.go b/internal/store/store.go index 0141542e..f4e8a32a 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -5046,21 +5046,28 @@ func (s *Store) projectNeedsBackfill(project string) (bool, error) { args: []any{project, project, DefaultSyncTargetKey, SyncEntityPrompt, SyncSourceLocal}, }, { - // Count non-orphaned relations whose source and target observations are - // locally available and that have no local upsert sync_mutations row. - // Mirrors the SELECT in backfillRelationSyncMutationsTx. + // Count only fully-judged relations (not orphaned, not pending, with + // marked_by_actor/kind populated) whose source and target observations + // are locally available and that have no local upsert sync_mutations row. + // Mirrors the SELECT in backfillRelationSyncMutationsTx exactly — any + // divergence causes the fast-path skip to desync from the write path. + // Pending/unmarked rows lack marked_by_* and would be rejected by cloud + // validation (HTTP 400), so we exclude them from both the count and the + // backfill to avoid polluting the sync journal with undeliverable mutations. q: `SELECT COUNT(*) FROM memory_relations r JOIN observations src ON src.sync_id = r.source_id AND src.deleted_at IS NULL JOIN observations tgt ON tgt.sync_id = r.target_id AND tgt.deleted_at IS NULL LEFT JOIN sessions src_s ON src_s.id = src.session_id - WHERE r.judgment_status != ? + WHERE r.judgment_status NOT IN (?, ?) + AND ifnull(r.marked_by_actor, '') != '' + AND ifnull(r.marked_by_kind, '') != '' AND coalesce(nullif(src.project, ''), src_s.project, '') = ? AND NOT EXISTS ( SELECT 1 FROM sync_mutations sm WHERE sm.target_key = ? AND sm.entity = ? AND sm.entity_key = r.sync_id AND sm.source = ? )`, - args: []any{JudgmentStatusOrphaned, project, DefaultSyncTargetKey, SyncEntityRelation, SyncSourceLocal}, + args: []any{JudgmentStatusOrphaned, JudgmentStatusPending, project, DefaultSyncTargetKey, SyncEntityRelation, SyncSourceLocal}, }, } for _, cq := range queries { @@ -5398,6 +5405,11 @@ func (s *Store) backfillPromptSyncMutationsTx(tx *sql.Tx, project string) error // intentionally omits that filter to avoid skipping cross-project edges where // only the source belongs to this project. func (s *Store) backfillRelationSyncMutationsTx(tx *sql.Tx, project string) error { + // Only backfill fully-judged relations: exclude orphaned/pending and any row + // that is missing marked_by_actor or marked_by_kind. Cloud validation + // (chunkcodec + server) hard-rejects mutations without those fields (HTTP 400), + // so enqueueing them would block the entire sync batch. + // This predicate must stay identical to the COUNT in projectNeedsBackfill. rows, err := s.queryItHook(tx, ` SELECT r.sync_id, r.source_id, r.target_id, r.relation, r.reason, r.evidence, r.confidence, r.judgment_status, r.marked_by_actor, r.marked_by_kind, r.marked_by_model, @@ -5408,7 +5420,9 @@ func (s *Store) backfillRelationSyncMutationsTx(tx *sql.Tx, project string) erro JOIN observations src ON src.sync_id = r.source_id AND src.deleted_at IS NULL JOIN observations tgt ON tgt.sync_id = r.target_id AND tgt.deleted_at IS NULL LEFT JOIN sessions src_s ON src_s.id = src.session_id - WHERE r.judgment_status != ? + WHERE r.judgment_status NOT IN (?, ?) + AND ifnull(r.marked_by_actor, '') != '' + AND ifnull(r.marked_by_kind, '') != '' AND coalesce(nullif(src.project, ''), src_s.project, '') = ? AND NOT EXISTS ( SELECT 1 FROM sync_mutations sm @@ -5418,7 +5432,7 @@ func (s *Store) backfillRelationSyncMutationsTx(tx *sql.Tx, project string) erro AND sm.source = ? ) ORDER BY r.created_at ASC, r.sync_id ASC`, - JudgmentStatusOrphaned, + JudgmentStatusOrphaned, JudgmentStatusPending, project, DefaultSyncTargetKey, SyncEntityRelation, SyncSourceLocal, ) From e1ed602880895a32a490492df7b15825f3f9abc8 Mon Sep 17 00:00:00 2001 From: Alan Buscaglia Date: Sun, 14 Jun 2026 12:25:46 +0200 Subject: [PATCH 3/4] fix(store): apply session-fallback in cross-project guard and fix REQ-011 log placement - validateCrossProjectGuard now uses coalesce(nullif(o.project,''), s.project, '') with LEFT JOIN sessions for both source and target, matching the JudgeBySemantic project-derivation query so blank observations.project cannot bypass the guard when session projects differ (closes cross-project guard bypass via session fallback) - Move REQ-011 warning log in JudgeBySemantic to after the enrollment gate so it only fires when a mutation will actually be enqueued, matching JudgeRelation ordering - Add regression tests: TestCrossProjectGuard_SessionFallback_RejectsDifferentSessionProjects and TestCrossProjectGuard_SessionFallback_AllowsSameSessionProject --- internal/store/relation_backfill_test.go | 92 ++++++++++++++++++++++++ internal/store/relations.go | 24 ++++--- 2 files changed, 107 insertions(+), 9 deletions(-) diff --git a/internal/store/relation_backfill_test.go b/internal/store/relation_backfill_test.go index 50e77077..12ac478f 100644 --- a/internal/store/relation_backfill_test.go +++ b/internal/store/relation_backfill_test.go @@ -10,6 +10,7 @@ package store import ( "database/sql" + "errors" "testing" ) @@ -547,3 +548,94 @@ func TestJudgeBySemantic_UsesSessionFallback_ForProject(t *testing.T) { t.Errorf("expected payload project %q, got %q", "proj-sf", payloadProject) } } + +// ─── Test 7: cross-project guard uses session fallback ──────────────────────── + +// TestCrossProjectGuard_SessionFallback_RejectsDifferentSessionProjects verifies +// that validateCrossProjectGuard (used by both JudgeBySemantic and JudgeRelation) +// rejects a relation whose observations have blank observations.project but whose +// SESSIONS belong to DIFFERENT projects. +// +// This is the regression test for the bypass: without the session-fallback query +// in validateCrossProjectGuard, the guard sees "" == "" and allows the relation, +// even though the two observations live in different session-projects. +func TestCrossProjectGuard_SessionFallback_RejectsDifferentSessionProjects(t *testing.T) { + s := newTestStore(t) + + // Two sessions in DIFFERENT projects. + if err := s.CreateSession("ses-guard-p", "proj-guard-p", "/tmp/guard-p"); err != nil { + t.Fatalf("CreateSession p: %v", err) + } + if err := s.CreateSession("ses-guard-q", "proj-guard-q", "/tmp/guard-q"); err != nil { + t.Fatalf("CreateSession q: %v", err) + } + + // Observations with blank observations.project — project lives only in session. + _, srcSyncID := addTestObsSession(t, s, "ses-guard-p", "Guard source obs", "decision", "", "project") + _, tgtSyncID := addTestObsSession(t, s, "ses-guard-q", "Guard target obs", "decision", "", "project") + + // JudgeBySemantic must detect cross-project via session fallback and reject. + _, err := s.JudgeBySemantic(JudgeBySemanticParams{ + SourceID: srcSyncID, + TargetID: tgtSyncID, + Relation: RelationRelated, + Reasoning: "cross-project session-fallback guard test", + Model: "test-model", + }) + if !errors.Is(err, ErrCrossProjectRelation) { + t.Errorf("JudgeBySemantic: expected ErrCrossProjectRelation; got %v", err) + } + + // JudgeRelation must also reject for the same reason. + relSyncID := newSyncID("rel") + if _, err2 := s.SaveRelation(SaveRelationParams{ + SyncID: relSyncID, + SourceID: srcSyncID, + TargetID: tgtSyncID, + }); err2 != nil { + t.Fatalf("SaveRelation: %v", err2) + } + _, err = s.JudgeRelation(JudgeRelationParams{ + JudgmentID: relSyncID, + Relation: RelationRelated, + MarkedByActor: "agent:test", + MarkedByKind: "agent", + }) + if !errors.Is(err, ErrCrossProjectRelation) { + t.Errorf("JudgeRelation: expected ErrCrossProjectRelation; got %v", err) + } +} + +// TestCrossProjectGuard_SessionFallback_AllowsSameSessionProject verifies that +// the guard does NOT reject observations that share the same session project, +// even when observations.project is blank. This guards against over-tightening. +func TestCrossProjectGuard_SessionFallback_AllowsSameSessionProject(t *testing.T) { + s := newTestStore(t) + + // Both observations in the SAME session (same project). + if err := s.CreateSession("ses-guard-same", "proj-guard-same", "/tmp/guard-same"); err != nil { + t.Fatalf("CreateSession: %v", err) + } + if err := s.EnrollProject("proj-guard-same"); err != nil { + t.Fatalf("EnrollProject: %v", err) + } + + // Both observations have blank observations.project but same session. + _, srcSyncID := addTestObsSession(t, s, "ses-guard-same", "Guard same src", "decision", "", "project") + _, tgtSyncID := addTestObsSession(t, s, "ses-guard-same", "Guard same tgt", "decision", "", "project") + + // Must NOT return ErrCrossProjectRelation. + relSyncID, err := s.JudgeBySemantic(JudgeBySemanticParams{ + SourceID: srcSyncID, + TargetID: tgtSyncID, + Relation: RelationRelated, + Reasoning: "same session project guard test", + Model: "test-model", + }) + if err != nil { + t.Errorf("JudgeBySemantic: unexpected error for same-session-project pair: %v", err) + } + if relSyncID == "" { + t.Error("JudgeBySemantic: expected non-empty relSyncID for same-session-project pair") + } +} diff --git a/internal/store/relations.go b/internal/store/relations.go index 6975694c..788b21de 100644 --- a/internal/store/relations.go +++ b/internal/store/relations.go @@ -701,10 +701,16 @@ func (s *Store) JudgeRelation(p JudgeRelationParams) (*Relation, error) { func validateCrossProjectGuard(tx *sql.Tx, sourceID, targetID string) error { var srcProject, tgtProject string _ = tx.QueryRow( - `SELECT ifnull(project,'') FROM observations WHERE sync_id = ?`, sourceID, + `SELECT coalesce(nullif(o.project,''), s.project, '') + FROM observations o + LEFT JOIN sessions s ON s.id = o.session_id + WHERE o.sync_id = ?`, sourceID, ).Scan(&srcProject) _ = tx.QueryRow( - `SELECT ifnull(project,'') FROM observations WHERE sync_id = ?`, targetID, + `SELECT coalesce(nullif(o.project,''), s.project, '') + FROM observations o + LEFT JOIN sessions s ON s.id = o.session_id + WHERE o.sync_id = ?`, targetID, ).Scan(&tgtProject) if srcProject != "" && tgtProject != "" && srcProject != tgtProject { @@ -840,13 +846,6 @@ func (s *Store) JudgeBySemantic(p JudgeBySemanticParams) (string, error) { enrollCheckProject = tgtProject } - // REQ-011: log at WARNING level when source observation is missing locally - // (project='' race condition). The server will reject with 400; this log - // is the local breadcrumb so the gap is not silently swallowed. - if srcProject == "" { - log.Printf("[store] WARNING: JudgeBySemantic enqueueing relation %s with project='' (source observation missing locally); server will reject", existingSyncID) - } - var enrolled int if err := tx.QueryRow( `SELECT 1 FROM sync_enrolled_projects WHERE project = ? LIMIT 1`, enrollCheckProject, @@ -857,6 +856,13 @@ func (s *Store) JudgeBySemantic(p JudgeBySemanticParams) (string, error) { return nil // not enrolled — backfill will cover it on enrollment } + // REQ-011: log at WARNING level when source observation is missing locally + // (project='' race condition). The server will reject with 400; this log + // is the local breadcrumb so the gap is not silently swallowed. + if srcProject == "" { + log.Printf("[store] WARNING: JudgeBySemantic enqueueing relation %s with project='' (source observation missing locally); server will reject", existingSyncID) + } + // Build payload from the freshly-written row. rel, err := s.getRelationTx(tx, existingSyncID) if err != nil { From 9225c573897ea7c42999f1febccdadf3c052e1b3 Mon Sep 17 00:00:00 2001 From: Alan Buscaglia Date: Sun, 14 Jun 2026 12:35:10 +0200 Subject: [PATCH 4/4] fix(store): use session-fallback project in JudgeRelation enqueue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the plain `SELECT ifnull(project,'') FROM observations` lookups for srcProject/tgtProject in JudgeRelation with the session-fallback form already used by JudgeBySemantic: SELECT coalesce(nullif(o.project,''), s.project, '') FROM observations o LEFT JOIN sessions s ON s.id = o.session_id WHERE o.sync_id = ? Without this, enrolled-project relations whose observations carry a blank observations.project (project lives only on the session) resolve srcProject="" → enrollCheckProject="" → enrolled=0 → return nil with no sync_mutations row, recreating the #496 replication gap for the JudgeRelation path. The fix makes enqueue behaviour consistent with JudgeBySemantic and ensures the payload Project is non-empty. Adds TestJudgeRelation_UsesSessionFallback_ForProject (TDD: red before fix, green after) which covers both the missing mutation row and the empty payload.project assertions. --- internal/store/relation_backfill_test.go | 81 ++++++++++++++++++++++++ internal/store/relations.go | 15 ++++- 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/internal/store/relation_backfill_test.go b/internal/store/relation_backfill_test.go index 12ac478f..80e2170b 100644 --- a/internal/store/relation_backfill_test.go +++ b/internal/store/relation_backfill_test.go @@ -639,3 +639,84 @@ func TestCrossProjectGuard_SessionFallback_AllowsSameSessionProject(t *testing.T t.Error("JudgeBySemantic: expected non-empty relSyncID for same-session-project pair") } } + +// ─── Test 8: JudgeRelation uses session-fallback for project derivation ──────── + +// TestJudgeRelation_UsesSessionFallback_ForProject verifies that JudgeRelation +// derives the Project for the enqueued sync mutation via the session-fallback +// (coalesce(obs.project, session.project)), not from observations.project alone. +// +// Scenario: project P is enrolled; a session exists in P; two observations are +// created in that session with BLANK observations.project (project lives only on +// the session). A pending relation is saved between them. Calling JudgeRelation +// must enqueue a sync_mutations row whose payload.project is "P", not "". +// +// Without the fix, srcProject="" → enrollCheckProject="" → enrolled=0 → return nil +// (no mutation row). After the fix, the session-fallback resolves srcProject="P", +// enrollCheckProject="P", enrolled=1, and the mutation is written immediately. +func TestJudgeRelation_UsesSessionFallback_ForProject(t *testing.T) { + s := newTestStore(t) + + // Create a session with an explicit project and enroll it. + if err := s.CreateSession("ses-jr-sf", "proj-jr-sf", "/tmp/jr-sf"); err != nil { + t.Fatalf("CreateSession: %v", err) + } + if err := s.EnrollProject("proj-jr-sf"); err != nil { + t.Fatalf("EnrollProject: %v", err) + } + + // Create observations with BLANK observations.project — project lives only on + // the session. This simulates observations ingested before the project column + // was populated, which is the exact gap described in the CodeRabbit finding. + _, srcSyncID := addTestObsSession(t, s, "ses-jr-sf", "JR SF source obs", "decision", "", "project") + _, tgtSyncID := addTestObsSession(t, s, "ses-jr-sf", "JR SF target obs", "decision", "", "project") + + // Save a pending relation (the precursor step before JudgeRelation). + relSyncID := newSyncID("rel-jr-sf") + if _, err := s.SaveRelation(SaveRelationParams{ + SyncID: relSyncID, + SourceID: srcSyncID, + TargetID: tgtSyncID, + }); err != nil { + t.Fatalf("SaveRelation: %v", err) + } + + // Pre-condition: no mutation row yet (SaveRelation does not enqueue). + if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 0 { + t.Fatalf("precondition: expected 0 sync_mutations before JudgeRelation, got %d", n) + } + + // Call JudgeRelation — this is the code path being fixed. + if _, err := s.JudgeRelation(JudgeRelationParams{ + JudgmentID: relSyncID, + Relation: RelationRelated, + MarkedByActor: "test-actor", + MarkedByKind: "agent", + }); err != nil { + t.Fatalf("JudgeRelation: %v", err) + } + + // Post-condition 1: a sync_mutations row must exist immediately after + // JudgeRelation (no waiting for startup backfill). + if n := countRelationSyncMutationsByKey(t, s, relSyncID); n != 1 { + t.Errorf("expected 1 sync_mutations row after JudgeRelation on enrolled project (session-fallback); got %d", n) + } + + // Post-condition 2: the enqueued payload's project must be "proj-jr-sf", + // not "" — so cloud validation does not reject it. + var payloadProject string + if err := s.db.QueryRow( + `SELECT ifnull(json_extract(payload, '$.project'), '') + FROM sync_mutations + WHERE entity = ? AND entity_key = ? AND source = ?`, + SyncEntityRelation, relSyncID, SyncSourceLocal, + ).Scan(&payloadProject); err != nil { + t.Fatalf("reading payload project from sync_mutations: %v", err) + } + if payloadProject == "" { + t.Errorf("JudgeRelation enqueued payload has empty project; session-fallback must populate it") + } + if payloadProject != "proj-jr-sf" { + t.Errorf("expected payload project %q, got %q", "proj-jr-sf", payloadProject) + } +} diff --git a/internal/store/relations.go b/internal/store/relations.go index 788b21de..1797d269 100644 --- a/internal/store/relations.go +++ b/internal/store/relations.go @@ -591,13 +591,22 @@ func (s *Store) JudgeRelation(p JudgeRelationParams) (*Relation, error) { if err := s.withTx(func(tx *sql.Tx) error { // ── Cross-project guard (Phase 2, REQ-003) ───────────────────────── // Derive source and target project for enrollment checks and the guard. - // Missing observation → empty string (REQ-011 edge). + // Use the same session-fallback form as JudgeBySemantic so that enrolled + // projects whose observations have a blank project column (but whose session + // carries the project) are resolved correctly. Missing observation → empty + // string (REQ-011 edge) because the LEFT JOIN returns no row. var srcProject, tgtProject string _ = tx.QueryRow( - `SELECT ifnull(project,'') FROM observations WHERE sync_id = ?`, sourceID, + `SELECT coalesce(nullif(o.project,''), s.project, '') + FROM observations o + LEFT JOIN sessions s ON s.id = o.session_id + WHERE o.sync_id = ?`, sourceID, ).Scan(&srcProject) _ = tx.QueryRow( - `SELECT ifnull(project,'') FROM observations WHERE sync_id = ?`, targetID, + `SELECT coalesce(nullif(o.project,''), s.project, '') + FROM observations o + LEFT JOIN sessions s ON s.id = o.session_id + WHERE o.sync_id = ?`, targetID, ).Scan(&tgtProject) // Delegate to shared helper; reject cross-project pairs.