diff --git a/offload/durability_soundness_test.go b/offload/durability_soundness_test.go index dcc5721..24f06e5 100644 --- a/offload/durability_soundness_test.go +++ b/offload/durability_soundness_test.go @@ -347,6 +347,42 @@ func TestOffloadPresenceSizeUnverifiedFingerprintHeldOut(t *testing.T) { mustExist(t, filepath.Join(root, "a.txt")) } +// TestOffloadGateNamesPeerProvenance: a stale component a durability pull +// asserted names the asserting peer in the gate's refusal — the audit +// trail answers "which evidence (and from which peer) gated this offload" +// (the residual of #104). A locally-verified component reads "locally +// verified" instead. +func TestOffloadGateNamesPeerProvenance(t *testing.T) { + root := t.TempDir() + writeFile(t, filepath.Join(root, "a.txt"), "alpha") + s := setupStore(t) + ctx := context.Background() + idx := indexVolume(t, s, root) + v := testVolume(t, s) + self := selfNode(t, s) + + peer, err := s.GetOrCreateOriginNode(ctx, "nas") + if err != nil { + t.Fatalf("GetOrCreateOriginNode(nas): %v", err) + } + if err := s.UpsertDestinationRunIDPulled(ctx, v.ID, "t1", self.ID, idx.RunID-1, store.VerifyMethodBlake3, peer.ID, false); err != nil { + t.Fatalf("UpsertDestinationRunIDPulled: %v", err) + } + recordPush(t, s, v.ID, "t1") + + rep, err := Offload(ctx, s, root, Options{ + Name: volName, Paths: []string{"."}, Require: []string{"t1"}, + }) + if err != nil { + t.Fatalf("Offload: %v", err) + } + res := oneResult(t, rep, "a.txt", OutcomeNotDurable) + if len(res.Reasons) != 1 || !strings.Contains(res.Reasons[0], "asserted by peer nas") { + t.Fatalf("reasons = %v, want the stale failure naming the asserting peer nas", res.Reasons) + } + mustExist(t, filepath.Join(root, "a.txt")) +} + // TestOffloadContentVerifiedMethodsGate: blake3, peer-blake3, and // kopia-verify components each gate on their own (no fingerprint needed) // once the vector and freshness conditions hold — the stricter gate does diff --git a/offload/gate.go b/offload/gate.go index 3cc20c3..3eb685b 100644 --- a/offload/gate.go +++ b/offload/gate.go @@ -10,12 +10,17 @@ import ( ) // component is one loaded durability-vector entry: the highest origin -// run covered for an origin node, plus the verification method that -// advanced it. The method lets the gate refuse a presence-only -// component that has no content verification behind it. +// run covered for an origin node, the verification method that advanced +// it, and the provenance of that advance. The method lets the gate +// refuse a presence-only component that has no content verification +// behind it; source is the asserting peer's node id when the component +// was last advanced by a durability pull, and invalid (NULL) for the +// locally-verified class — so the gate's decision names which peer's +// evidence backed it. type component struct { coveredRun int64 method string + source sql.NullInt64 } // gate is the offline durability evidence for one invocation: the self @@ -60,7 +65,7 @@ func loadGate(ctx context.Context, s *store.Store, volumeID int64, require []str } vector := make(map[int64]component, len(components)) for _, c := range components { - vector[c.OriginNodeID] = component{coveredRun: c.OriginRunID, method: c.VerifyMethod} + vector[c.OriginNodeID] = component{coveredRun: c.OriginRunID, method: c.VerifyMethod, source: c.SourceNodeID} } g.vectors[target] = vector @@ -115,7 +120,7 @@ func (g *gate) check(ctx context.Context, row store.FileRow) ([]string, error) { continue case comp.coveredRun < originRun: failures = append(failures, - fmt.Sprintf("%s: stale: have %d need %d (origin %s)", target, comp.coveredRun, originRun, g.nodeName(ctx, originNode))) + fmt.Sprintf("%s: stale: have %d need %d (origin %s, %s)", target, comp.coveredRun, originRun, g.nodeName(ctx, originNode), g.provenance(ctx, comp))) continue } if reason := g.freshnessFailure(ctx, target, row, originNode, originRun); reason != "" { @@ -128,7 +133,7 @@ func (g *gate) check(ctx context.Context, row store.FileRow) ([]string, error) { } if !verified { failures = append(failures, - fmt.Sprintf("%s: not content-verified (method %q); a verified fingerprint must back the object before offload", target, displayMethod(comp.method))) + fmt.Sprintf("%s: not content-verified (method %q, %s); a verified fingerprint must back the object before offload", target, displayMethod(comp.method), g.provenance(ctx, comp))) } } return failures, nil @@ -230,6 +235,19 @@ func (g *gate) origin(ctx context.Context, row store.FileRow) (int64, int64, err return g.self.ID, intro, nil } +// provenance renders a component's evidence class for a gate decision: +// "locally verified" for a component this node advanced itself, or +// "asserted by peer " for one a durability pull tagged. The audit +// trail thus names which peer's evidence gated (or failed to gate) an +// offload, and a peer whose assertions later prove untrustworthy is +// identifiable per decision. +func (g *gate) provenance(ctx context.Context, comp component) string { + if !comp.source.Valid { + return "locally verified" + } + return fmt.Sprintf("asserted by peer %s", g.nodeName(ctx, comp.source.Int64)) +} + // nodeName resolves an origin node id to its name for the failure // messages, cached per invocation. A lookup failure degrades to the // numeric id — the gate decision is already made, naming is cosmetic. diff --git a/offload/offload_test.go b/offload/offload_test.go index 254f1c5..5944f82 100644 --- a/offload/offload_test.go +++ b/offload/offload_test.go @@ -322,8 +322,8 @@ func TestOffloadPeerOriginContent(t *testing.T) { } oneResult(t, rep, "covered.txt", OutcomeOffloaded) res := oneResult(t, rep, "ahead.txt", OutcomeNotDurable) - if !strings.Contains(res.Reasons[0], "t1: stale: have 7 need 9 (origin peer1)") { - t.Fatalf("reasons = %v, want stale failure naming origin peer1", res.Reasons) + if !strings.Contains(res.Reasons[0], "t1: stale: have 7 need 9 (origin peer1, locally verified)") { + t.Fatalf("reasons = %v, want stale failure naming origin peer1 and local provenance", res.Reasons) } mustBeGone(t, filepath.Join(root, "covered.txt")) mustExist(t, filepath.Join(root, "ahead.txt")) diff --git a/store/destination_run_ids.go b/store/destination_run_ids.go index c8d4746..752c1f6 100644 --- a/store/destination_run_ids.go +++ b/store/destination_run_ids.go @@ -75,6 +75,12 @@ func KnownVerifyMethod(method string) bool { // runs FK. Content with origin (N, r) is durable on a destination iff // the vector's component for N is ≥ r. VerifyMethod names the comparison // that last advanced the component (empty for a pre-v19 row). +// +// SourceNodeID is the provenance class: NULL when the component was +// advanced by a transfer this node observed itself (the locally-verified +// class, via AdvanceDestinationVectorTo), and the asserting peer's node +// id when a durability pull last advanced it. The offload gate weighs a +// peer-asserted component as a distinct, revocable class. type DestinationRunID struct { VolumeID int64 Destination string @@ -82,12 +88,16 @@ type DestinationRunID struct { OriginRunID int64 UpdatedAtNs int64 VerifyMethod string + SourceNodeID sql.NullInt64 } // DestinationRunIDHistory is one row of the insert-only // destination_run_ids_history log: a single vector-component advance. // AtNs is the insertion timestamp; rows written in the same tick still // order by id. VerifyMethod records the method behind this advance. +// SourceNodeID records the advance's provenance (NULL locally-verified, +// else the asserting peer) so the audit log traces revocation, not just +// the live vector. type DestinationRunIDHistory struct { ID int64 VolumeID int64 @@ -96,6 +106,7 @@ type DestinationRunIDHistory struct { OriginRunID int64 AtNs int64 VerifyMethod string + SourceNodeID sql.NullInt64 } // DestinationRewindError carries the rejected and current vector @@ -123,7 +134,7 @@ func (e *DestinationRewindError) Unwrap() error { return ErrWatermarkRewind } // run id advances from it. func (s *Store) GetDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID int64) (DestinationRunID, error) { row := s.db.QueryRowContext(ctx, - `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method + `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id FROM destination_run_ids WHERE volume_id = ? AND destination = ? AND origin_node_id = ?`, volumeID, destination, originNodeID) @@ -135,7 +146,7 @@ func (s *Store) GetDestinationRunID(ctx context.Context, volumeID int64, destina // means the destination has no recorded durability yet. func (s *Store) ListDestinationRunIDs(ctx context.Context, volumeID int64, destination string) ([]DestinationRunID, error) { return queryRows(ctx, s.db, - `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method + `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id FROM destination_run_ids WHERE volume_id = ? AND destination = ? ORDER BY origin_node_id`, @@ -149,7 +160,7 @@ func (s *Store) ListDestinationRunIDs(ctx context.Context, volumeID int64, desti // can see. func (s *Store) ListVolumeDestinationRunIDs(ctx context.Context, volumeID int64) ([]DestinationRunID, error) { return queryRows(ctx, s.db, - `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method + `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id FROM destination_run_ids WHERE volume_id = ? ORDER BY destination, origin_node_id`, @@ -159,7 +170,7 @@ func (s *Store) ListVolumeDestinationRunIDs(ctx context.Context, volumeID int64) func scanDestinationRunID(s rowScanner) (DestinationRunID, error) { var d DestinationRunID var method sql.NullString - err := s.Scan(&d.VolumeID, &d.Destination, &d.OriginNodeID, &d.OriginRunID, &d.UpdatedAtNs, &method) + err := s.Scan(&d.VolumeID, &d.Destination, &d.OriginNodeID, &d.OriginRunID, &d.UpdatedAtNs, &method, &d.SourceNodeID) d.VerifyMethod = method.String return d, err } @@ -177,7 +188,7 @@ func scanDestinationRunID(s rowScanner) (DestinationRunID, error) { // use rather than writing components directly. func (s *Store) AdvanceDestinationVectorTo(ctx context.Context, volumeID int64, destination, verifyMethod string, components []OriginComponent) error { for _, c := range components { - err := s.upsertDestinationRunID(ctx, volumeID, destination, c.OriginNodeID, c.OriginRunID, verifyMethod, false) + err := s.upsertDestinationRunID(ctx, volumeID, destination, c.OriginNodeID, c.OriginRunID, verifyMethod, sql.NullInt64{}, false) if errors.Is(err, ErrWatermarkRewind) { continue } @@ -285,19 +296,37 @@ func scanOriginComponent(s rowScanner) (OriginComponent, error) { // is unchanged, so a methodless re-confirmation (e.g. a pull from a // pre-v19 peer) never degrades a content-verified component to unknown. func (s *Store) UpsertDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, allowRewind bool) error { - return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, "", allowRewind) + return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, "", sql.NullInt64{}, allowRewind) } -// UpsertDestinationRunIDVerified is UpsertDestinationRunID with an -// explicit verification method recorded on the component — the entry -// point the durability pull uses to carry a peer's reported method -// verbatim, so the puller's offload gate weighs a pulled component -// exactly as the responder did. +// UpsertDestinationRunIDVerified advances a component this node verified +// itself, recording verifyMethod and leaving source_node_id NULL (the +// locally-verified provenance class). The durability pull does not use +// it — peer-asserted advances go through UpsertDestinationRunIDPulled so +// the two classes stay distinguishable and a peer's assertions revocable. func (s *Store) UpsertDestinationRunIDVerified(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, allowRewind bool) error { - return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, verifyMethod, allowRewind) + return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, verifyMethod, sql.NullInt64{}, allowRewind) +} + +// UpsertDestinationRunIDPulled records a peer-pulled advance: it carries +// the origin's reported verify method verbatim (so the puller's offload +// gate weighs the component exactly as the responder did) and tags the +// component with sourceNodeID, the asserting peer. This is the only +// entry point that stamps non-local provenance; every locally-verified +// path leaves source_node_id NULL, so revocation can drop a peer's +// assertions without touching evidence this node observed itself. +func (s *Store) UpsertDestinationRunIDPulled(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, sourceNodeID int64, allowRewind bool) error { + return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, verifyMethod, sql.NullInt64{Int64: sourceNodeID, Valid: true}, allowRewind) } -func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, allowRewind bool) error { +// upsertDestinationRunID is the shared advance. sourceNodeID carries +// provenance: an invalid (NULL) value is the locally-verified class; a +// valid value is the asserting peer. source_node_id follows the run: a +// strict advance adopts the incoming provenance, an incoming local (NULL) +// re-confirmation upgrades a peer-tagged component back to local, and a +// peer re-confirmation at the recorded run never downgrades a +// locally-verified (NULL) component to peer-asserted. +func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, sourceNodeID sql.NullInt64, allowRewind bool) error { if destination == "" { return fmt.Errorf("UpsertDestinationRunID: destination must be non-empty") } @@ -310,8 +339,8 @@ func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, dest atNs := NowNs() method := nullableString(verifyMethod) res, err := tx.ExecContext(ctx, ` - INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id) + VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(volume_id, destination, origin_node_id) DO UPDATE SET origin_run_id = excluded.origin_run_id, updated_at_ns = excluded.updated_at_ns, @@ -319,9 +348,14 @@ func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, dest WHEN excluded.verify_method IS NOT NULL THEN excluded.verify_method WHEN excluded.origin_run_id > destination_run_ids.origin_run_id THEN NULL ELSE destination_run_ids.verify_method + END, + source_node_id = CASE + WHEN excluded.origin_run_id > destination_run_ids.origin_run_id THEN excluded.source_node_id + WHEN excluded.source_node_id IS NULL THEN NULL + ELSE destination_run_ids.source_node_id END WHERE excluded.origin_run_id >= destination_run_ids.origin_run_id OR ? - `, volumeID, destination, originNodeID, originRunID, atNs, method, allowRewind) + `, volumeID, destination, originNodeID, originRunID, atNs, method, sourceNodeID, allowRewind) if err != nil { return fmt.Errorf("upsert destination_run_ids: %w", err) } @@ -337,9 +371,9 @@ func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, dest } if _, err := tx.ExecContext(ctx, ` INSERT INTO destination_run_ids_history - (volume_id, destination, origin_node_id, origin_run_id, at_ns, verify_method) - VALUES (?, ?, ?, ?, ?, ?) - `, volumeID, destination, originNodeID, originRunID, atNs, method); err != nil { + (volume_id, destination, origin_node_id, origin_run_id, at_ns, verify_method, source_node_id) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, volumeID, destination, originNodeID, originRunID, atNs, method, sourceNodeID); err != nil { return fmt.Errorf("append destination_run_ids_history: %w", err) } if err := tx.Commit(); err != nil { @@ -388,7 +422,7 @@ func guardDestinationMonotonicTx(ctx context.Context, tx *sql.Tx, volumeID int64 // recorded advances. func (s *Store) ListDestinationRunIDHistory(ctx context.Context, volumeID int64, destination string) ([]DestinationRunIDHistory, error) { return queryRows(ctx, s.db, ` - SELECT id, volume_id, destination, origin_node_id, origin_run_id, at_ns, verify_method + SELECT id, volume_id, destination, origin_node_id, origin_run_id, at_ns, verify_method, source_node_id FROM destination_run_ids_history WHERE volume_id = ? AND destination = ? ORDER BY id @@ -398,7 +432,30 @@ func (s *Store) ListDestinationRunIDHistory(ctx context.Context, volumeID int64, func scanDestinationRunIDHistory(s rowScanner) (DestinationRunIDHistory, error) { var h DestinationRunIDHistory var method sql.NullString - err := s.Scan(&h.ID, &h.VolumeID, &h.Destination, &h.OriginNodeID, &h.OriginRunID, &h.AtNs, &method) + err := s.Scan(&h.ID, &h.VolumeID, &h.Destination, &h.OriginNodeID, &h.OriginRunID, &h.AtNs, &method, &h.SourceNodeID) h.VerifyMethod = method.String return h, err } + +// RevokeDestinationRunIDsFromSource removes the live durability-vector +// components a single peer asserted, returning how many it deleted. It is +// the operator's recovery path for a compromised or mistaken peer: pulled +// evidence (source_node_id = sourceNodeID) is dropped while +// locally-verified components (source_node_id NULL) and other peers' +// assertions stay, so the live verified vector is never rewound. The +// append-only destination_run_ids_history is untouched — the assertions +// remain in the audit trail, so revocation is a forward act, not a +// rewrite of history. A revoked component reverts to "no row" (no floor); +// a later legitimate pull or verified push re-advances it. +func (s *Store) RevokeDestinationRunIDsFromSource(ctx context.Context, sourceNodeID int64) (int64, error) { + res, err := s.db.ExecContext(ctx, + `DELETE FROM destination_run_ids WHERE source_node_id = ?`, sourceNodeID) + if err != nil { + return 0, fmt.Errorf("revoke destination_run_ids from source %d: %w", sourceNodeID, err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("revoke destination_run_ids rows: %w", err) + } + return n, nil +} diff --git a/store/destination_run_ids_test.go b/store/destination_run_ids_test.go index 79ad41c..03fc18f 100644 --- a/store/destination_run_ids_test.go +++ b/store/destination_run_ids_test.go @@ -649,3 +649,248 @@ func TestContentVerifiedMethod(t *testing.T) { } } } + +// TestMigrateV21ToV22AddsSourceNodeID builds a minimal v21 database with a +// pre-existing durability component and confirms the migration adds +// source_node_id (NULL on the carried-over row — the locally-verified +// class) without disturbing the recorded coordinate or its method. +func TestMigrateV21ToV22AddsSourceNodeID(t *testing.T) { + dsn := filepath.Join(t.TempDir(), "test.db") + rawDB, err := sql.Open("sqlite", dsn) + if err != nil { + t.Fatalf("raw sql.Open: %v", err) + } + v21DDL := []string{ + `CREATE TABLE schema_version (version INTEGER NOT NULL PRIMARY KEY)`, + `CREATE TABLE volumes (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, path TEXT NOT NULL)`, + `CREATE TABLE nodes (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, endpoint TEXT, public_key_fingerprint TEXT)`, + `CREATE TABLE contents ( + id INTEGER PRIMARY KEY, + blake3 BLOB NOT NULL UNIQUE CHECK (length(blake3) = 32), + size_bytes INTEGER NOT NULL, + origin_node_id INTEGER REFERENCES nodes(id), + origin_run_id INTEGER + )`, + `CREATE TABLE destination_run_ids ( + volume_id INTEGER NOT NULL REFERENCES volumes(id), + destination TEXT NOT NULL, + origin_node_id INTEGER NOT NULL REFERENCES nodes(id), + origin_run_id INTEGER NOT NULL, + updated_at_ns INTEGER NOT NULL, + verify_method TEXT, + PRIMARY KEY (volume_id, destination, origin_node_id) + )`, + `CREATE TABLE destination_run_ids_history ( + id INTEGER PRIMARY KEY, + volume_id INTEGER NOT NULL, + destination TEXT NOT NULL, + origin_node_id INTEGER NOT NULL, + origin_run_id INTEGER NOT NULL, + at_ns INTEGER NOT NULL, + verify_method TEXT + )`, + `INSERT INTO schema_version (version) VALUES (21)`, + `INSERT INTO volumes (id, name, path) VALUES (1, 'v', '/v')`, + `INSERT INTO nodes (id, name) VALUES (1, 'self')`, + `INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method) + VALUES (1, 'bucket', 1, 7, 100, 'blake3')`, + } + for _, q := range v21DDL { + if _, err := rawDB.Exec(q); err != nil { + t.Fatalf("v21 DDL %q: %v", q, err) + } + } + rawDB.Close() + + s, err := Open(dsn) + if err != nil { + t.Fatalf("Open (migrates v21→v22): %v", err) + } + defer s.Close() + ctx := context.Background() + if v, _ := s.CurrentSchemaVersion(ctx); v != SchemaVersion { + t.Fatalf("schema_version = %d, want %d", v, SchemaVersion) + } + got, err := s.GetDestinationRunID(ctx, 1, "bucket", 1) + if err != nil { + t.Fatalf("GetDestinationRunID: %v", err) + } + if got.OriginRunID != 7 || got.VerifyMethod != VerifyMethodBlake3 { + t.Fatalf("carried-over component = run %d method %q, want 7 / blake3", got.OriginRunID, got.VerifyMethod) + } + if got.SourceNodeID.Valid { + t.Fatalf("source_node_id = %d, want NULL (locally-verified backfill)", got.SourceNodeID.Int64) + } +} + +// TestUpsertDestinationRunIDPulledTagsSource: a pulled advance records the +// asserting peer on the live row and in history, while a locally-verified +// advance for a different origin stays untagged (NULL). The two classes +// are distinguishable as the residual of #104 requires. +func TestUpsertDestinationRunIDPulledTagsSource(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + vID := makeVolume(t, s, "/v") + self, err := s.GetSelfNode(ctx) + if err != nil { + t.Fatalf("GetSelfNode: %v", err) + } + peer, err := s.GetOrCreateOriginNode(ctx, "nas") + if err != nil { + t.Fatalf("GetOrCreateOriginNode(nas): %v", err) + } + origin, err := s.GetOrCreateOriginNode(ctx, "laptop") + if err != nil { + t.Fatalf("GetOrCreateOriginNode(laptop): %v", err) + } + + if err := s.UpsertDestinationRunIDVerified(ctx, vID, "offsite", self.ID, 9, VerifyMethodBlake3, false); err != nil { + t.Fatalf("local verified advance: %v", err) + } + if err := s.UpsertDestinationRunIDPulled(ctx, vID, "offsite", origin.ID, 5, VerifyMethodKopia, peer.ID, false); err != nil { + t.Fatalf("pulled advance: %v", err) + } + + local, err := s.GetDestinationRunID(ctx, vID, "offsite", self.ID) + if err != nil { + t.Fatalf("GetDestinationRunID(local): %v", err) + } + if local.SourceNodeID.Valid { + t.Fatalf("locally-verified component source = %d, want NULL", local.SourceNodeID.Int64) + } + pulled, err := s.GetDestinationRunID(ctx, vID, "offsite", origin.ID) + if err != nil { + t.Fatalf("GetDestinationRunID(pulled): %v", err) + } + if !pulled.SourceNodeID.Valid || pulled.SourceNodeID.Int64 != peer.ID { + t.Fatalf("pulled component source = %+v, want peer %d", pulled.SourceNodeID, peer.ID) + } + + history, err := s.ListDestinationRunIDHistory(ctx, vID, "offsite") + if err != nil { + t.Fatalf("ListDestinationRunIDHistory: %v", err) + } + bySource := map[int64]sql.NullInt64{} + for _, h := range history { + bySource[h.OriginNodeID] = h.SourceNodeID + } + if src := bySource[self.ID]; src.Valid { + t.Fatalf("history for local advance carries source %d, want NULL", src.Int64) + } + if src := bySource[origin.ID]; !src.Valid || src.Int64 != peer.ID { + t.Fatalf("history for pulled advance source = %+v, want peer %d", src, peer.ID) + } +} + +// TestUpsertDestinationRunIDProvenanceTransitions: a peer re-confirmation +// at the recorded run never downgrades a locally-verified (NULL) +// component to peer-asserted, and a local re-confirmation upgrades a +// peer-tagged component back to locally-verified — so a peer cannot +// launder local provenance away, and a verified push reclaims a pulled +// component. +func TestUpsertDestinationRunIDProvenanceTransitions(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + vID := makeVolume(t, s, "/v") + self, err := s.GetSelfNode(ctx) + if err != nil { + t.Fatalf("GetSelfNode: %v", err) + } + peer, err := s.GetOrCreateOriginNode(ctx, "nas") + if err != nil { + t.Fatalf("GetOrCreateOriginNode(nas): %v", err) + } + + if err := s.UpsertDestinationRunIDVerified(ctx, vID, "offsite", self.ID, 10, VerifyMethodBlake3, false); err != nil { + t.Fatalf("local advance: %v", err) + } + if err := s.UpsertDestinationRunIDPulled(ctx, vID, "offsite", self.ID, 10, VerifyMethodBlake3, peer.ID, false); err != nil { + t.Fatalf("peer re-confirm at recorded run: %v", err) + } + got, err := s.GetDestinationRunID(ctx, vID, "offsite", self.ID) + if err != nil { + t.Fatalf("GetDestinationRunID: %v", err) + } + if got.SourceNodeID.Valid { + t.Fatalf("local component downgraded to peer %d by an equal-run re-confirm", got.SourceNodeID.Int64) + } + + if err := s.UpsertDestinationRunIDPulled(ctx, vID, "offsite", self.ID, 20, VerifyMethodKopia, peer.ID, false); err != nil { + t.Fatalf("peer strict advance: %v", err) + } + got, _ = s.GetDestinationRunID(ctx, vID, "offsite", self.ID) + if !got.SourceNodeID.Valid || got.SourceNodeID.Int64 != peer.ID { + t.Fatalf("after peer strict advance source = %+v, want peer %d", got.SourceNodeID, peer.ID) + } + + if err := s.UpsertDestinationRunIDVerified(ctx, vID, "offsite", self.ID, 20, VerifyMethodBlake3, false); err != nil { + t.Fatalf("local re-confirm at peer run: %v", err) + } + got, _ = s.GetDestinationRunID(ctx, vID, "offsite", self.ID) + if got.SourceNodeID.Valid { + t.Fatalf("local re-confirm did not reclaim provenance, source = %d", got.SourceNodeID.Int64) + } +} + +// TestRevokeDestinationRunIDsFromSource: revoking a peer drops the live +// components it asserted while leaving locally-verified components and a +// second peer's assertions in place, and the append-only history is +// untouched — revocation is a forward act, not a rewrite of the audit +// trail or the verified vector. +func TestRevokeDestinationRunIDsFromSource(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + vID := makeVolume(t, s, "/v") + self, err := s.GetSelfNode(ctx) + if err != nil { + t.Fatalf("GetSelfNode: %v", err) + } + badPeer, err := s.GetOrCreateOriginNode(ctx, "nas") + if err != nil { + t.Fatalf("GetOrCreateOriginNode(nas): %v", err) + } + goodPeer, err := s.GetOrCreateOriginNode(ctx, "mirror") + if err != nil { + t.Fatalf("GetOrCreateOriginNode(mirror): %v", err) + } + originA, err := s.GetOrCreateOriginNode(ctx, "laptop") + if err != nil { + t.Fatalf("GetOrCreateOriginNode(laptop): %v", err) + } + + if err := s.UpsertDestinationRunIDVerified(ctx, vID, "offsite", self.ID, 9, VerifyMethodBlake3, false); err != nil { + t.Fatalf("local advance: %v", err) + } + if err := s.UpsertDestinationRunIDPulled(ctx, vID, "offsite", originA.ID, 5, VerifyMethodKopia, badPeer.ID, false); err != nil { + t.Fatalf("bad-peer advance: %v", err) + } + if err := s.UpsertDestinationRunIDPulled(ctx, vID, "offsite", goodPeer.ID, 3, VerifyMethodKopia, goodPeer.ID, false); err != nil { + t.Fatalf("good-peer advance: %v", err) + } + + n, err := s.RevokeDestinationRunIDsFromSource(ctx, badPeer.ID) + if err != nil { + t.Fatalf("RevokeDestinationRunIDsFromSource: %v", err) + } + if n != 1 { + t.Fatalf("revoked %d components, want 1", n) + } + + if _, err := s.GetDestinationRunID(ctx, vID, "offsite", originA.ID); !errors.Is(err, sql.ErrNoRows) { + t.Fatalf("revoked component still present (err=%v)", err) + } + if local, err := s.GetDestinationRunID(ctx, vID, "offsite", self.ID); err != nil || local.SourceNodeID.Valid { + t.Fatalf("locally-verified component disturbed by revocation: %+v err=%v", local, err) + } + if good, err := s.GetDestinationRunID(ctx, vID, "offsite", goodPeer.ID); err != nil || good.SourceNodeID.Int64 != goodPeer.ID { + t.Fatalf("other peer's component disturbed by revocation: %+v err=%v", good, err) + } + + history, err := s.ListDestinationRunIDHistory(ctx, vID, "offsite") + if err != nil { + t.Fatalf("ListDestinationRunIDHistory: %v", err) + } + if len(history) != 3 { + t.Fatalf("history rows = %d after revocation, want 3 (audit trail untouched)", len(history)) + } +} diff --git a/store/migrations.go b/store/migrations.go index 592039f..ffcc7bc 100644 --- a/store/migrations.go +++ b/store/migrations.go @@ -10,7 +10,7 @@ import ( ) // SchemaVersion is the schema version this binary writes and reads. -const SchemaVersion = 21 +const SchemaVersion = 22 // freshSchemaBaseline is the version applied to a brand-new database. The // chain in `migrations` continues from here. v1 is no longer reachable from @@ -57,6 +57,7 @@ func buildMigrations(mctx migrationCtx) []migration { {version: 19, up: migrateV18ToV19}, {version: 20, up: migrateV19ToV20}, {version: 21, up: migrateV20ToV21}, + {version: 22, up: migrateV21ToV22}, } } @@ -1869,3 +1870,44 @@ func contentsImmutableTriggers() []string { END`, } } + +// --- v21 → v22 --- + +// migrateV21ToV22 adds source_node_id provenance to the durability vector +// so a pulled (peer-asserted) component is distinguishable from a +// locally-verified one. destination_run_ids.source_node_id is the peer +// whose pull last advanced the live component; destination_run_ids_history.source_node_id +// records the same per advance so the audit log can answer "which peer's +// evidence gated this offload". +// +// Both columns are additive and nullable, FK to nodes(id). NULL is the +// locally-verified class — the value AdvanceDestinationVectorTo writes +// when a destination handler or peer-sync initiator confirms a transfer +// it observed itself; a non-NULL value names the asserting peer the +// durability pull resolved. Existing components carry over NULL, reading +// as locally-verified, which is the safe inherited interpretation: a pull +// re-stamps a component it asserts on the next pull, while a genuinely +// local row stays NULL forever. +// +// No index: source_node_id is low-cardinality (a handful of peers), and +// revocation scans the volume's vector by equality, which the existing +// (volume_id, destination, origin_node_id) primary key already narrows. +func migrateV21ToV22(ctx context.Context, db *sql.DB) error { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + stmts := []string{ + `ALTER TABLE destination_run_ids ADD COLUMN source_node_id INTEGER REFERENCES nodes(id)`, + `ALTER TABLE destination_run_ids_history ADD COLUMN source_node_id INTEGER REFERENCES nodes(id)`, + `INSERT INTO schema_version (version) VALUES (22)`, + } + for _, q := range stmts { + if _, err := tx.ExecContext(ctx, q); err != nil { + return fmt.Errorf("v21→v22: %w", err) + } + } + return tx.Commit() +} diff --git a/store/schema.sql b/store/schema.sql index af2afc2..3b37223 100644 --- a/store/schema.sql +++ b/store/schema.sql @@ -1,6 +1,6 @@ -- Generated by `go test ./store -update-schema` — DO NOT EDIT. -- --- Flattened snapshot of the squirrel index schema at version 21, for humans +-- Flattened snapshot of the squirrel index schema at version 22, for humans -- and agents who want the current shape without replaying the migration -- chain in migrations.go. It is NOT used to create or migrate databases — -- a fresh DB is built by applyV5 plus the migration registry. The golden @@ -41,7 +41,7 @@ CREATE TABLE destination_run_ids ( destination TEXT NOT NULL, origin_node_id INTEGER NOT NULL REFERENCES nodes(id), origin_run_id INTEGER NOT NULL, - updated_at_ns INTEGER NOT NULL, verify_method TEXT, + updated_at_ns INTEGER NOT NULL, verify_method TEXT, source_node_id INTEGER REFERENCES nodes(id), PRIMARY KEY (volume_id, destination, origin_node_id) ); @@ -52,7 +52,7 @@ CREATE TABLE destination_run_ids_history ( origin_node_id INTEGER NOT NULL, origin_run_id INTEGER NOT NULL, at_ns INTEGER NOT NULL - , verify_method TEXT); + , verify_method TEXT, source_node_id INTEGER REFERENCES nodes(id)); CREATE INDEX idx_destination_run_ids_history ON destination_run_ids_history(volume_id, destination); diff --git a/sync/durability.go b/sync/durability.go index 265db81..78a70cb 100644 --- a/sync/durability.go +++ b/sync/durability.go @@ -136,6 +136,10 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol if err != nil { return rep, err } + sourceNode, err := s.GetOrCreateOriginNode(ctx, peerName) + if err != nil { + return rep, fmt.Errorf("resolve source peer %q: %w", peerName, err) + } rep.Fetched = len(resp.Components) + len(resp.Freshness) originIDs := make(map[string]int64, 4) resolveOrigin := func(name string) (int64, error) { @@ -170,7 +174,7 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol if err != nil { return rep, err } - err = s.UpsertDestinationRunIDVerified(ctx, volumeID, c.Destination, nodeID, c.OriginRun, c.VerifyMethod, allowRewind) + err = s.UpsertDestinationRunIDPulled(ctx, volumeID, c.Destination, nodeID, c.OriginRun, c.VerifyMethod, sourceNode.ID, allowRewind) var rewind *store.DestinationRewindError if errors.As(err, &rewind) { rep.Rewinds = append(rep.Rewinds, DurabilityRewind{ diff --git a/sync/durability_test.go b/sync/durability_test.go index b9557a7..817881e 100644 --- a/sync/durability_test.go +++ b/sync/durability_test.go @@ -138,6 +138,58 @@ func TestPullDurabilityCopiesComponents(t *testing.T) { } } +// TestPullDurabilityTagsSourcePeer: the pull tags every merged component +// with the asserting peer's local node id (the residual of #104), so the +// offload gate can weigh peer-asserted evidence as a distinct, revocable +// class. Locally-verified components written here directly stay untagged. +func TestPullDurabilityTagsSourcePeer(t *testing.T) { + f := setupNodeFixtureNoRclone(t) + ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = nil + originName := seedReceiverDurability(t, f, map[string]int64{"offsite-a": 12}) + + v, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path) + if err != nil { + t.Fatalf("CreateVolume on initiator: %v", err) + } + self, err := f.initStore.GetSelfNode(ctx) + if err != nil { + t.Fatalf("GetSelfNode: %v", err) + } + if err := f.initStore.UpsertDestinationRunIDVerified(ctx, v.ID, "offsite-a", self.ID, 4, store.VerifyMethodBlake3, false); err != nil { + t.Fatalf("seed local verified component: %v", err) + } + + if _, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false); err != nil { + t.Fatalf("PullDurability: %v", err) + } + + peer, err := f.initStore.GetNodeByName(ctx, f.node.Name) + if err != nil { + t.Fatalf("source peer %q not resolved locally: %v", f.node.Name, err) + } + origin, err := f.initStore.GetNodeByName(ctx, originName) + if err != nil { + t.Fatalf("origin node %q not created locally: %v", originName, err) + } + + pulled, err := f.initStore.GetDestinationRunID(ctx, v.ID, "offsite-a", origin.ID) + if err != nil { + t.Fatalf("GetDestinationRunID(pulled): %v", err) + } + if !pulled.SourceNodeID.Valid || pulled.SourceNodeID.Int64 != peer.ID { + t.Fatalf("pulled component source = %+v, want peer %q (id %d)", pulled.SourceNodeID, f.node.Name, peer.ID) + } + local, err := f.initStore.GetDestinationRunID(ctx, v.ID, "offsite-a", self.ID) + if err != nil { + t.Fatalf("GetDestinationRunID(local): %v", err) + } + if local.SourceNodeID.Valid { + t.Fatalf("locally-verified component tagged with source %d, want NULL", local.SourceNodeID.Int64) + } +} + // TestPullDurabilityDropsUnconfiguredDestinations: the pull merges // components for destinations the volume references (one via // offload_requires, one via sync_to) and drops one for an unconfigured