diff --git a/README.md b/README.md index f913793..f84ce99 100644 --- a/README.md +++ b/README.md @@ -203,13 +203,16 @@ To replay: process segments in ascending run id; each line with status `present` ```toml [volumes.pictures] -path = "~/Pictures" -sync_to = ["nas", "offsite"] -offload_requires = ["nas", "offsite"] +path = "~/Pictures" +sync_to = ["nas", "offsite"] +offload_requires = ["nas", "offsite"] +offload_max_evidence_age = "720h" # optional; default disabled ``` `offload_requires` is the explicit per-volume policy: every named target's recorded durability must cover a file's content before its bytes may go, and a volume without the key refuses to offload entirely. The names share the flat destination/node namespace that `sync_to` uses. They may also name targets only a *peer* pushes to: evidence about those arrives through the peer durability pull (`squirrel peer-sync pull-durability`), and a name with no recorded evidence simply keeps the gate closed. +`offload_max_evidence_age` is an optional, opt-in defence-in-depth knob: when set, a target whose durability evidence was last *re-verified* longer ago than this (in wall-clock time) is treated as stale and refuses the offload, even when its version-vector coverage is sound. This stops the gate from trusting a destination that has been dead or unreachable for months on the strength of a claim never since re-confirmed. It is fail-closed — evidence with an unknown verification time (a row migrated from before the column existed, or one only ever touched without re-verification) is refused too. Re-verification (a fresh push, or a durability pull from the asserting peer) re-stamps the evidence and clears the staleness. The default is disabled (no maximum age), so existing configs are unaffected; only an equal-value re-confirmation that carries no verification never refreshes the clock, so the age tracks genuine checks rather than no-op touches. + ``` squirrel offload pictures 2019/ # a subtree squirrel offload pictures --older-than 90d # by age (indexed mtime) diff --git a/cmd/squirrel/offload.go b/cmd/squirrel/offload.go index 0f5a5e7..9c59ec4 100644 --- a/cmd/squirrel/offload.go +++ b/cmd/squirrel/offload.go @@ -61,11 +61,12 @@ func runOffload(cmd *cobra.Command, volumeName string, paths []string, olderThan defer s.Close() rep, err := offload.Offload(cmd.Context(), s, vol.Path, offload.Options{ - Name: volumeName, - Paths: paths, - OlderThan: olderThan, - Require: vol.OffloadRequires, - DryRun: dryRun, + Name: volumeName, + Paths: paths, + OlderThan: olderThan, + Require: vol.OffloadRequires, + MaxEvidenceAge: vol.OffloadMaxEvidenceAge, + DryRun: dryRun, }) printOffloadReport(cmd.OutOrStdout(), cmd.ErrOrStderr(), rep, dryRun) if err != nil { diff --git a/config/config.go b/config/config.go index d6f2431..aebc335 100644 --- a/config/config.go +++ b/config/config.go @@ -80,6 +80,16 @@ type Volume struct { // durability pull about targets only that peer reaches; a name with // no recorded evidence keeps the gate closed. OffloadRequires []string + // OffloadMaxEvidenceAge bounds how old a required target's durability + // evidence may be (in wall-clock time since it was last re-verified) + // before `squirrel offload` refuses to delete on its strength. Zero — + // the default — disables the time-based staleness policy, leaving the + // version-vector coverage gate as the sole durability check, so the + // knob is opt-in and existing configs are unaffected. When set, + // evidence whose last verification is unknown or older than this is + // refused (fail-closed); it pairs with periodic re-verification + // (`squirrel verify`, scrub) that re-stamps fresh evidence. + OffloadMaxEvidenceAge time.Duration // SyncEvery is the agent-scheduler cadence for full syncs of this // volume. Zero means "no scheduled sync" — the agent never auto- // triggers a sync for this volume; manual `squirrel sync` still @@ -266,12 +276,13 @@ type rawConfig struct { } type rawVolume struct { - Path string `toml:"path"` - SyncTo []string `toml:"sync_to"` - OffloadRequires []string `toml:"offload_requires"` - SyncEvery string `toml:"sync_every"` - IndexEvery string `toml:"index_every"` - Hook *rawVolumeHook `toml:"hook"` + Path string `toml:"path"` + SyncTo []string `toml:"sync_to"` + OffloadRequires []string `toml:"offload_requires"` + OffloadMaxEvidenceAge string `toml:"offload_max_evidence_age"` + SyncEvery string `toml:"sync_every"` + IndexEvery string `toml:"index_every"` + Hook *rawVolumeHook `toml:"hook"` } type rawVolumeHook struct { @@ -365,6 +376,10 @@ func resolveVolume(name string, raw rawVolume, dests map[string]*Destination, no if err := validateOffloadRequires(raw.OffloadRequires); err != nil { return nil, err } + offloadMaxEvidenceAge, err := parseOffloadMaxEvidenceAge(raw.OffloadMaxEvidenceAge) + if err != nil { + return nil, err + } syncEvery, err := parseVolumeCadence("sync_every", raw.SyncEvery) if err != nil { return nil, err @@ -385,16 +400,37 @@ func resolveVolume(name string, raw rawVolume, dests map[string]*Destination, no return nil, err } return &Volume{ - Name: name, - Path: abs, - SyncTo: raw.SyncTo, - OffloadRequires: raw.OffloadRequires, - SyncEvery: syncEvery, - IndexEvery: indexEvery, - Hook: hook, + Name: name, + Path: abs, + SyncTo: raw.SyncTo, + OffloadRequires: raw.OffloadRequires, + OffloadMaxEvidenceAge: offloadMaxEvidenceAge, + SyncEvery: syncEvery, + IndexEvery: indexEvery, + Hook: hook, }, nil } +// parseOffloadMaxEvidenceAge parses the optional offload_max_evidence_age +// knob. Empty stays zero — the time-based staleness policy is disabled, +// its opt-in default. Non-empty must parse as a strictly positive +// time.Duration; a sub-second value is almost certainly a missing unit +// suffix (e.g. `30` where `720h` or `30d`-style was meant) for a knob +// whose sensible values are days to months, so it is rejected. +func parseOffloadMaxEvidenceAge(raw string) (time.Duration, error) { + if raw == "" { + return 0, nil + } + dur, err := time.ParseDuration(raw) + if err != nil { + return 0, fmt.Errorf("offload_max_evidence_age %q: %w", raw, err) + } + if dur < time.Second { + return 0, fmt.Errorf("offload_max_evidence_age must be at least %s, got %s", time.Second, raw) + } + return dur, nil +} + // validateOffloadRequires checks the offload policy entries // syntactically: each must be a well-formed target name and appear // once. Membership in this config's destinations/nodes is deliberately diff --git a/config/config_test.go b/config/config_test.go index cc1ef3c..e44f4d4 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -286,6 +286,52 @@ offload_requires = ["nas", "nas"] } } +// TestLoadOffloadMaxEvidenceAge: the optional staleness knob parses as a +// duration; absent it defaults to zero (the time-based policy disabled). +func TestLoadOffloadMaxEvidenceAge(t *testing.T) { + p := writeConfig(t, ` +[volumes.pictures] +path = "/tmp/pictures" +offload_requires = ["nas"] +offload_max_evidence_age = "720h" + +[volumes.docs] +path = "/tmp/docs" +offload_requires = ["nas"] +`) + cfg, err := Load(p) + if err != nil { + t.Fatalf("Load: %v", err) + } + if got := cfg.Volumes["pictures"].OffloadMaxEvidenceAge; got != 720*time.Hour { + t.Fatalf("OffloadMaxEvidenceAge = %s, want 720h", got) + } + if got := cfg.Volumes["docs"].OffloadMaxEvidenceAge; got != 0 { + t.Fatalf("OffloadMaxEvidenceAge = %s, want 0 (disabled by default)", got) + } +} + +func TestLoadRejectsBadOffloadMaxEvidenceAge(t *testing.T) { + cases := []struct{ name, value string }{ + {"garbage", "soon"}, + {"sub_second", "500ms"}, + {"unitless", "30"}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + p := writeConfig(t, ` +[volumes.pictures] +path = "/tmp/pictures" +offload_requires = ["nas"] +offload_max_evidence_age = "`+c.value+`" +`) + if _, err := Load(p); err == nil || !strings.Contains(err.Error(), "offload_max_evidence_age") { + t.Fatalf("expected offload_max_evidence_age error, got %v", err) + } + }) + } +} + func TestLoadRejectsInvalidName(t *testing.T) { // Names that wouldn't survive being a filesystem subfolder or an // rclone.conf section are rejected at load time. diff --git a/offload/gate.go b/offload/gate.go index 3eb685b..4ca952d 100644 --- a/offload/gate.go +++ b/offload/gate.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/mbertschler/squirrel/store" ) @@ -16,11 +17,14 @@ import ( // behind it; source is the asserting peer's node id when the component // was last advanced by a durability pull, and invalid (NULL) for the // locally-verified class — so the gate's decision names which peer's -// evidence backed it. +// evidence backed it. verifiedAt is the wall-clock of the last write +// backed by genuine re-verification (NULL when unknown), the basis for +// the time-based staleness policy. type component struct { coveredRun int64 method string source sql.NullInt64 + verifiedAt sql.NullInt64 } // gate is the offline durability evidence for one invocation: the self @@ -41,22 +45,33 @@ type gate struct { lastPush map[string]int64 // target → last whole-volume push run (local space) freshness map[string]map[int64]int64 // target → origin node id → pulled push-freshness origin run nodeNames map[int64]string + // nowNs is the invocation's wall-clock, captured once so every + // candidate's staleness is judged against one instant and tests can + // inject a fixed time. + nowNs int64 + // maxEvidenceAge, when positive, refuses a component whose + // verified_at_ns is older than nowNs − maxEvidenceAge (or unknown). + // Zero disables the time-based policy — the opt-in default, so the + // version-vector gate behaves exactly as before. + maxEvidenceAge time.Duration } -func loadGate(ctx context.Context, s *store.Store, volumeID int64, require []string) (*gate, error) { +func loadGate(ctx context.Context, s *store.Store, volumeID int64, require []string, nowNs int64, maxEvidenceAge time.Duration) (*gate, error) { self, err := s.GetSelfNode(ctx) if err != nil { return nil, fmt.Errorf("lookup self node: %w", err) } g := &gate{ - store: s, - volumeID: volumeID, - self: self, - require: require, - vectors: make(map[string]map[int64]component, len(require)), - lastPush: make(map[string]int64, len(require)), - freshness: make(map[string]map[int64]int64, len(require)), - nodeNames: map[int64]string{self.ID: self.Name}, + store: s, + volumeID: volumeID, + self: self, + require: require, + vectors: make(map[string]map[int64]component, len(require)), + lastPush: make(map[string]int64, len(require)), + freshness: make(map[string]map[int64]int64, len(require)), + nodeNames: map[int64]string{self.ID: self.Name}, + nowNs: nowNs, + maxEvidenceAge: maxEvidenceAge, } for _, target := range require { components, err := s.ListDestinationRunIDs(ctx, volumeID, target) @@ -65,7 +80,7 @@ func loadGate(ctx context.Context, s *store.Store, volumeID int64, require []str } vector := make(map[int64]component, len(components)) for _, c := range components { - vector[c.OriginNodeID] = component{coveredRun: c.OriginRunID, method: c.VerifyMethod, source: c.SourceNodeID} + vector[c.OriginNodeID] = component{coveredRun: c.OriginRunID, method: c.VerifyMethod, source: c.SourceNodeID, verifiedAt: c.VerifiedAtNs} } g.vectors[target] = vector @@ -89,9 +104,13 @@ func loadGate(ctx context.Context, s *store.Store, volumeID int64, require []str } // check evaluates the gate for one present row. Content with origin -// (N, r) is durable on a target only when all three conditions hold: +// (N, r) is durable on a target only when all four conditions hold: // // - origin vector: the target's component for N covers r; +// - evidence age: when a max evidence age is configured, the +// component's verified_at_ns is within it — defence-in-depth against +// a destination that was once durable but has not had its coverage +// re-confirmed in wall-clock time; // - freshness: a successful whole-volume push covers the run in which // the path last became present, so a path re-acquired after the last // push is held until a fresh push covers it. For a target this node @@ -102,7 +121,7 @@ func loadGate(ctx context.Context, s *store.Store, volumeID int64, require []str // only content-addressed component — a verified scan-back // fingerprint backs the gated object. // -// The file passes only when every required target satisfies all three. +// The file passes only when every required target satisfies all four. // The returned failures name each failing target and reason; an empty // slice means the gate passed. func (g *gate) check(ctx context.Context, row store.FileRow) ([]string, error) { @@ -123,6 +142,10 @@ func (g *gate) check(ctx context.Context, row store.FileRow) ([]string, error) { fmt.Sprintf("%s: stale: have %d need %d (origin %s, %s)", target, comp.coveredRun, originRun, g.nodeName(ctx, originNode), g.provenance(ctx, comp))) continue } + if reason := g.staleEvidenceFailure(ctx, target, comp); reason != "" { + failures = append(failures, reason) + continue + } if reason := g.freshnessFailure(ctx, target, row, originNode, originRun); reason != "" { failures = append(failures, reason) continue @@ -139,6 +162,39 @@ func (g *gate) check(ctx context.Context, row store.FileRow) ([]string, error) { return failures, nil } +// staleEvidenceFailure refuses the target when a max evidence age is +// configured and the component's verification is older than that age in +// wall-clock time — the time-based staleness policy (issue #131), a +// complement to the version-vector coverage check above. It is +// fail-closed: a component whose verified_at_ns is unknown (NULL — a +// pre-v23 row, or one only ever advanced methodlessly at its recorded +// run) is treated as infinitely stale and refused, since the gate cannot +// show the coverage was recently re-confirmed. +// +// For a peer-asserted component (a durability pull tagged it) the +// verified_at_ns this node holds is when it last pulled a fresh +// assertion from that peer — the peer's own verification instant never +// travels the wire — so the policy bounds how long this node trusts +// relayed evidence without hearing from the peer again, the dead-peer +// defence the issue calls for. A zero maxEvidenceAge disables the policy +// entirely. +func (g *gate) staleEvidenceFailure(ctx context.Context, target string, comp component) string { + if g.maxEvidenceAge <= 0 { + return "" + } + cutoff := g.nowNs - int64(g.maxEvidenceAge) + if !comp.verifiedAt.Valid { + return fmt.Sprintf("%s: stale evidence: never re-verified (max age %s, %s)", + target, g.maxEvidenceAge, g.provenance(ctx, comp)) + } + if comp.verifiedAt.Int64 < cutoff { + age := time.Duration(g.nowNs - comp.verifiedAt.Int64) + return fmt.Sprintf("%s: stale evidence: last verified %s ago > max age %s (%s)", + target, age.Round(time.Second), g.maxEvidenceAge, g.provenance(ctx, comp)) + } + return "" +} + // freshnessFailure refuses the target when no successful whole-volume // push covers the run in which the path last became present, closing the // re-acquisition hole: a path deleted, re-introduced, and re-indexed must diff --git a/offload/offload.go b/offload/offload.go index 129e9e1..61d05d9 100644 --- a/offload/offload.go +++ b/offload/offload.go @@ -41,6 +41,12 @@ type Options struct { // policy is an explicit precondition, there is no default target // set. Require []string + // MaxEvidenceAge, when positive, refuses any required target whose + // durability evidence was last re-verified longer ago than this in + // wall-clock time (issue #131). Zero disables the time-based + // staleness policy — the opt-in default, so the version-vector gate + // behaves exactly as before. + MaxEvidenceAge time.Duration // DryRun evaluates and reports the per-file gate decisions from the // index alone: no runs row, no file reads, no deletions, no status // flips. Disk-drift checks only happen on a real run, immediately @@ -129,7 +135,8 @@ func Offload(ctx context.Context, s *store.Store, root string, opts Options) (re if err != nil { return Report{}, err } - g, err := loadGate(ctx, s, vol.ID, opts.Require) + now := time.Now() + g, err := loadGate(ctx, s, vol.ID, opts.Require, now.UnixNano(), opts.MaxEvidenceAge) if err != nil { return Report{}, err } @@ -137,7 +144,7 @@ func Offload(ctx context.Context, s *store.Store, root string, opts Options) (re if err != nil { return Report{}, err } - candidates, misses := selectCandidates(rows, selectors, opts.OlderThan) + candidates, misses := selectCandidates(rows, selectors, opts.OlderThan, now) report.SelectorMisses = misses if opts.DryRun { @@ -248,11 +255,11 @@ func underReservedSubtree(p string) bool { // is applied after selector-hit tracking so a selector that only // matched younger files still counts as matched. Candidates come back // in path order for deterministic reports. -func selectCandidates(rows map[string]store.FileRow, selectors []string, olderThan time.Duration) ([]store.FileRow, []string) { +func selectCandidates(rows map[string]store.FileRow, selectors []string, olderThan time.Duration, now time.Time) ([]store.FileRow, []string) { ageFiltered := olderThan > 0 var cutoffNs int64 if ageFiltered { - cutoffNs = time.Now().Add(-olderThan).UnixNano() + cutoffNs = now.Add(-olderThan).UnixNano() } hit := make(map[string]bool, len(selectors)) var out []store.FileRow diff --git a/offload/staleness_test.go b/offload/staleness_test.go new file mode 100644 index 0000000..21a16d8 --- /dev/null +++ b/offload/staleness_test.go @@ -0,0 +1,119 @@ +package offload + +import ( + "context" + "database/sql" + "path/filepath" + "strings" + "testing" + "time" +) + +// fixedNow is an arbitrary, well-past-epoch wall clock the staleness tests +// reason against so evidence ages are exact and deterministic. +var fixedNow = time.Date(2026, 6, 21, 12, 0, 0, 0, time.UTC).UnixNano() + +const maxAge = 30 * 24 * time.Hour + +func verifiedAt(ns int64) sql.NullInt64 { return sql.NullInt64{Int64: ns, Valid: true} } + +// staleGate builds a gate with no store, populated only with the fields +// staleEvidenceFailure reads — the injected now, the max age, and a +// node-name cache for provenance rendering. The pure staleness logic needs +// no database. +func staleGate(maxEvidenceAge time.Duration, names map[int64]string) *gate { + return &gate{nowNs: fixedNow, maxEvidenceAge: maxEvidenceAge, nodeNames: names} +} + +// TestStaleEvidenceRefusesOffload: a locally-verified component last +// verified longer ago than the configured max age is refused, naming the +// age and provenance — even though its version-vector coverage is sound. +func TestStaleEvidenceRefusesOffload(t *testing.T) { + g := staleGate(maxAge, nil) + comp := component{coveredRun: 5, method: "blake3", verifiedAt: verifiedAt(fixedNow - int64(90*24*time.Hour))} + reason := g.staleEvidenceFailure(context.Background(), "t1", comp) + if !strings.Contains(reason, "stale evidence") || !strings.Contains(reason, "locally verified") { + t.Fatalf("reason = %q, want a stale-evidence refusal naming local provenance", reason) + } + if !strings.Contains(reason, "max age 720h0m0s") { + t.Fatalf("reason = %q, want it to name the configured max age", reason) + } +} + +// TestFreshEvidencePasses: a component verified within the max age clears +// the policy, and the policy is a no-op when disabled (zero max age) even +// for evidence verified far in the past. +func TestFreshEvidencePasses(t *testing.T) { + g := staleGate(maxAge, nil) + fresh := component{coveredRun: 5, method: "blake3", verifiedAt: verifiedAt(fixedNow - int64(24*time.Hour))} + if reason := g.staleEvidenceFailure(context.Background(), "t1", fresh); reason != "" { + t.Fatalf("reason = %q, want none for fresh evidence", reason) + } + + disabled := staleGate(0, nil) + ancient := component{coveredRun: 5, method: "blake3", verifiedAt: verifiedAt(fixedNow - int64(365*24*time.Hour))} + if reason := disabled.staleEvidenceFailure(context.Background(), "t1", ancient); reason != "" { + t.Fatalf("reason = %q, want none when the staleness policy is disabled", reason) + } +} + +// TestNullVerifiedAtRefusesUnderMaxAge: a component whose verified_at_ns is +// unknown (a pre-v23 row, or one only ever advanced methodlessly at its +// recorded run) is fail-closed — refused as never re-verified when a max +// age is set, but unaffected when the policy is disabled. +func TestNullVerifiedAtRefusesUnderMaxAge(t *testing.T) { + comp := component{coveredRun: 5, method: "blake3"} + if reason := staleGate(maxAge, nil).staleEvidenceFailure(context.Background(), "t1", comp); !strings.Contains(reason, "never re-verified") { + t.Fatalf("reason = %q, want a never-re-verified refusal", reason) + } + if reason := staleGate(0, nil).staleEvidenceFailure(context.Background(), "t1", comp); reason != "" { + t.Fatalf("reason = %q, want none when the staleness policy is disabled", reason) + } +} + +// TestPeerRelayedEvidenceStaleness: a peer-asserted component's +// verified_at_ns records when this node last pulled a fresh assertion. A +// recent pull is trusted; a peer gone silent past the max age ages out, +// and the refusal names the asserting peer, not the local node. +func TestPeerRelayedEvidenceStaleness(t *testing.T) { + const peerID = int64(42) + g := staleGate(maxAge, map[int64]string{peerID: "nas"}) + source := sql.NullInt64{Int64: peerID, Valid: true} + + recent := component{coveredRun: 5, method: "blake3", source: source, verifiedAt: verifiedAt(fixedNow - int64(24*time.Hour))} + if reason := g.staleEvidenceFailure(context.Background(), "t1", recent); reason != "" { + t.Fatalf("reason = %q, want none for a recently pulled peer assertion", reason) + } + + silent := component{coveredRun: 5, method: "blake3", source: source, verifiedAt: verifiedAt(fixedNow - int64(90*24*time.Hour))} + reason := g.staleEvidenceFailure(context.Background(), "t1", silent) + if !strings.Contains(reason, "stale evidence") || !strings.Contains(reason, "asserted by peer nas") { + t.Fatalf("reason = %q, want a stale-evidence refusal naming peer nas", reason) + } +} + +// TestOffloadEndToEndFreshEvidencePasses drives the full Offload path with +// a configured max age: a freshly verified component (seedVector stamps +// verified_at_ns at the write, moments before the gate's now) is within +// any sane max age, so the file is deleted. This proves the +// Options→gate wiring carries the knob and a freshly pushed copy is not +// collateral of the new policy. +func TestOffloadEndToEndFreshEvidencePasses(t *testing.T) { + root := t.TempDir() + writeFile(t, filepath.Join(root, "a.txt"), "alpha") + s := setupStore(t) + idx := indexVolume(t, s, root) + v := testVolume(t, s) + self := selfNode(t, s) + seedVector(t, s, v.ID, "t1", self.ID, idx.RunID) + + rep, err := Offload(context.Background(), s, root, Options{ + Name: volName, Paths: []string{"."}, Require: []string{"t1"}, + MaxEvidenceAge: maxAge, + }) + if err != nil { + t.Fatalf("Offload: %v", err) + } + oneResult(t, rep, "a.txt", OutcomeOffloaded) + mustBeGone(t, filepath.Join(root, "a.txt")) +} diff --git a/store/destination_run_ids.go b/store/destination_run_ids.go index 752c1f6..0606a77 100644 --- a/store/destination_run_ids.go +++ b/store/destination_run_ids.go @@ -81,6 +81,18 @@ func KnownVerifyMethod(method string) bool { // class, via AdvanceDestinationVectorTo), and the asserting peer's node // id when a durability pull last advanced it. The offload gate weighs a // peer-asserted component as a distinct, revocable class. +// +// UpdatedAtNs is the wall-clock of the last applied write — bumped even +// by an equal-value re-confirmation. VerifiedAtNs is the narrower +// freshness signal the offload gate's time-based staleness policy reads: +// it advances only on a write backed by genuine re-verification (a +// content-verified method, or a strict run advance), so a no-op touch +// never makes evidence look freshly checked. NULL means the verification +// time is unknown (a pre-v23 row, or a methodless advance that never +// carried verification) — the gate treats that as infinitely stale and +// refuses when a max age is configured. For a peer-asserted component it +// records when this node last pulled a fresh assertion, not the peer's +// own verification instant. type DestinationRunID struct { VolumeID int64 Destination string @@ -89,6 +101,7 @@ type DestinationRunID struct { UpdatedAtNs int64 VerifyMethod string SourceNodeID sql.NullInt64 + VerifiedAtNs sql.NullInt64 } // DestinationRunIDHistory is one row of the insert-only @@ -134,7 +147,7 @@ func (e *DestinationRewindError) Unwrap() error { return ErrWatermarkRewind } // run id advances from it. func (s *Store) GetDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID int64) (DestinationRunID, error) { row := s.db.QueryRowContext(ctx, - `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id + `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id, verified_at_ns FROM destination_run_ids WHERE volume_id = ? AND destination = ? AND origin_node_id = ?`, volumeID, destination, originNodeID) @@ -146,7 +159,7 @@ func (s *Store) GetDestinationRunID(ctx context.Context, volumeID int64, destina // means the destination has no recorded durability yet. func (s *Store) ListDestinationRunIDs(ctx context.Context, volumeID int64, destination string) ([]DestinationRunID, error) { return queryRows(ctx, s.db, - `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id + `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id, verified_at_ns FROM destination_run_ids WHERE volume_id = ? AND destination = ? ORDER BY origin_node_id`, @@ -160,7 +173,7 @@ func (s *Store) ListDestinationRunIDs(ctx context.Context, volumeID int64, desti // can see. func (s *Store) ListVolumeDestinationRunIDs(ctx context.Context, volumeID int64) ([]DestinationRunID, error) { return queryRows(ctx, s.db, - `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id + `SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id, verified_at_ns FROM destination_run_ids WHERE volume_id = ? ORDER BY destination, origin_node_id`, @@ -170,7 +183,7 @@ func (s *Store) ListVolumeDestinationRunIDs(ctx context.Context, volumeID int64) func scanDestinationRunID(s rowScanner) (DestinationRunID, error) { var d DestinationRunID var method sql.NullString - err := s.Scan(&d.VolumeID, &d.Destination, &d.OriginNodeID, &d.OriginRunID, &d.UpdatedAtNs, &method, &d.SourceNodeID) + err := s.Scan(&d.VolumeID, &d.Destination, &d.OriginNodeID, &d.OriginRunID, &d.UpdatedAtNs, &method, &d.SourceNodeID, &d.VerifiedAtNs) d.VerifyMethod = method.String return d, err } @@ -326,6 +339,15 @@ func (s *Store) UpsertDestinationRunIDPulled(ctx context.Context, volumeID int64 // re-confirmation upgrades a peer-tagged component back to local, and a // peer re-confirmation at the recorded run never downgrades a // locally-verified (NULL) component to peer-asserted. +// +// updated_at_ns is set to now on every applied write. verified_at_ns — +// the offload gate's time-based freshness signal — is set to now only +// when the write carries genuine re-verification: a strict run advance +// (new coverage proven) or a non-empty verify method (an advance or +// re-confirmation that named a verification). A methodless re-confirmation +// at the recorded run (a no-op touch, or a pull from a pre-v19 peer) +// preserves the prior verified_at_ns, so a stale component cannot be made +// to look freshly checked without genuine evidence. func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, sourceNodeID sql.NullInt64, allowRewind bool) error { if destination == "" { return fmt.Errorf("UpsertDestinationRunID: destination must be non-empty") @@ -339,8 +361,8 @@ func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, dest atNs := NowNs() method := nullableString(verifyMethod) res, err := tx.ExecContext(ctx, ` - INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id, verified_at_ns) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(volume_id, destination, origin_node_id) DO UPDATE SET origin_run_id = excluded.origin_run_id, updated_at_ns = excluded.updated_at_ns, @@ -353,9 +375,14 @@ func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, dest WHEN excluded.origin_run_id > destination_run_ids.origin_run_id THEN excluded.source_node_id WHEN excluded.source_node_id IS NULL THEN NULL ELSE destination_run_ids.source_node_id + END, + verified_at_ns = CASE + WHEN excluded.origin_run_id > destination_run_ids.origin_run_id THEN excluded.verified_at_ns + WHEN excluded.verify_method IS NOT NULL THEN excluded.verified_at_ns + ELSE destination_run_ids.verified_at_ns END WHERE excluded.origin_run_id >= destination_run_ids.origin_run_id OR ? - `, volumeID, destination, originNodeID, originRunID, atNs, method, sourceNodeID, allowRewind) + `, volumeID, destination, originNodeID, originRunID, atNs, method, sourceNodeID, atNs, allowRewind) if err != nil { return fmt.Errorf("upsert destination_run_ids: %w", err) } diff --git a/store/destination_run_ids_test.go b/store/destination_run_ids_test.go index 03fc18f..c4b26ba 100644 --- a/store/destination_run_ids_test.go +++ b/store/destination_run_ids_test.go @@ -81,6 +81,82 @@ func TestMigrateV18ToV19AddsVerifyMethod(t *testing.T) { } } +// TestMigrateV22ToV23AddsVerifiedAt builds a minimal v22 database with a +// pre-existing durability component and confirms the migration adds +// verified_at_ns (NULL on the carried-over row) without disturbing the +// recorded coordinate. A NULL verified_at_ns is the fail-closed signal: +// the gate reads it as "never re-verified" when a max age is configured. +func TestMigrateV22ToV23AddsVerifiedAt(t *testing.T) { + dsn := filepath.Join(t.TempDir(), "test.db") + rawDB, err := sql.Open("sqlite", dsn) + if err != nil { + t.Fatalf("raw sql.Open: %v", err) + } + v22DDL := []string{ + `CREATE TABLE schema_version (version INTEGER NOT NULL PRIMARY KEY)`, + `CREATE TABLE volumes (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, path TEXT NOT NULL)`, + `CREATE TABLE nodes (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, endpoint TEXT, public_key_fingerprint TEXT)`, + `CREATE TABLE contents ( + id INTEGER PRIMARY KEY, + blake3 BLOB NOT NULL UNIQUE CHECK (length(blake3) = 32), + size_bytes INTEGER NOT NULL, + origin_node_id INTEGER REFERENCES nodes(id), + origin_run_id INTEGER + )`, + `CREATE TABLE destination_run_ids ( + volume_id INTEGER NOT NULL REFERENCES volumes(id), + destination TEXT NOT NULL, + origin_node_id INTEGER NOT NULL REFERENCES nodes(id), + origin_run_id INTEGER NOT NULL, + updated_at_ns INTEGER NOT NULL, + verify_method TEXT, + source_node_id INTEGER REFERENCES nodes(id), + PRIMARY KEY (volume_id, destination, origin_node_id) + )`, + `CREATE TABLE destination_run_ids_history ( + id INTEGER PRIMARY KEY, + volume_id INTEGER NOT NULL, + destination TEXT NOT NULL, + origin_node_id INTEGER NOT NULL, + origin_run_id INTEGER NOT NULL, + at_ns INTEGER NOT NULL, + verify_method TEXT, + source_node_id INTEGER REFERENCES nodes(id) + )`, + `INSERT INTO schema_version (version) VALUES (22)`, + `INSERT INTO volumes (id, name, path) VALUES (1, 'v', '/v')`, + `INSERT INTO nodes (id, name) VALUES (1, 'self')`, + `INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method) + VALUES (1, 'bucket', 1, 7, 100, 'blake3')`, + } + for _, q := range v22DDL { + if _, err := rawDB.Exec(q); err != nil { + t.Fatalf("v22 DDL %q: %v", q, err) + } + } + rawDB.Close() + + s, err := Open(dsn) + if err != nil { + t.Fatalf("Open (migrates v22→v23): %v", err) + } + defer s.Close() + ctx := context.Background() + if v, _ := s.CurrentSchemaVersion(ctx); v != SchemaVersion { + t.Fatalf("schema_version = %d, want %d", v, SchemaVersion) + } + got, err := s.GetDestinationRunID(ctx, 1, "bucket", 1) + if err != nil { + t.Fatalf("GetDestinationRunID: %v", err) + } + if got.OriginRunID != 7 || got.VerifyMethod != VerifyMethodBlake3 { + t.Fatalf("got run=%d method=%q, want 7 and %q (carried over)", got.OriginRunID, got.VerifyMethod, VerifyMethodBlake3) + } + if got.VerifiedAtNs.Valid { + t.Fatalf("verified_at_ns = %v, want NULL on the carried-over row", got.VerifiedAtNs) + } +} + // TestUpsertDestinationRunIDWritesHistory: every successful advance // appends one destination_run_ids_history row alongside updating the // live vector component — the same append-only contract the peer-sync @@ -601,6 +677,87 @@ func TestUpsertDestinationRunIDPreservesMethodOnMethodlessReconfirm(t *testing.T } } +// TestUpsertDestinationRunIDVerifiedStampsVerifiedAt: a content-verified +// advance records verified_at_ns alongside updated_at_ns, the freshness +// signal the offload gate's time-based staleness policy reads. +func TestUpsertDestinationRunIDVerifiedStampsVerifiedAt(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + vID := makeVolume(t, s, "/v") + node, err := s.GetSelfNode(ctx) + if err != nil { + t.Fatalf("GetSelfNode: %v", err) + } + + before := NowNs() + if err := s.UpsertDestinationRunIDVerified(ctx, vID, "bucket", node.ID, 5, VerifyMethodBlake3, false); err != nil { + t.Fatalf("UpsertDestinationRunIDVerified: %v", err) + } + after := NowNs() + got, err := s.GetDestinationRunID(ctx, vID, "bucket", node.ID) + if err != nil { + t.Fatalf("GetDestinationRunID: %v", err) + } + if !got.VerifiedAtNs.Valid { + t.Fatalf("verified_at_ns is NULL, want a stamp from the verified advance") + } + if got.VerifiedAtNs.Int64 < before || got.VerifiedAtNs.Int64 > after { + t.Fatalf("verified_at_ns = %d, want within [%d, %d]", got.VerifiedAtNs.Int64, before, after) + } +} + +// TestUpsertDestinationRunIDVerifiedAtNotBumpedByMethodlessReconfirm: an +// equal-value methodless re-confirmation (a no-op touch, or a pull from a +// pre-v19 peer) refreshes updated_at_ns but must NOT advance +// verified_at_ns — the timestamp the gate trusts tracks genuine +// re-verification, not no-op touches (issue #131). +func TestUpsertDestinationRunIDVerifiedAtNotBumpedByMethodlessReconfirm(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + vID := makeVolume(t, s, "/v") + node, err := s.GetSelfNode(ctx) + if err != nil { + t.Fatalf("GetSelfNode: %v", err) + } + + // Seed a component whose verified_at_ns is deliberately far in the + // past, so a re-confirm that wrongly bumped it would be obvious. + const seededVerifiedAt = int64(1000) + if _, err := s.db.ExecContext(ctx, ` + INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, verified_at_ns) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, vID, "bucket", node.ID, 5, seededVerifiedAt, VerifyMethodBlake3, seededVerifiedAt); err != nil { + t.Fatalf("seed component: %v", err) + } + + if err := s.UpsertDestinationRunID(ctx, vID, "bucket", node.ID, 5, false); err != nil { + t.Fatalf("methodless reconfirm: %v", err) + } + got, err := s.GetDestinationRunID(ctx, vID, "bucket", node.ID) + if err != nil { + t.Fatalf("GetDestinationRunID: %v", err) + } + if !got.VerifiedAtNs.Valid || got.VerifiedAtNs.Int64 != seededVerifiedAt { + t.Fatalf("verified_at_ns = %v, want preserved at %d", got.VerifiedAtNs, seededVerifiedAt) + } + if got.UpdatedAtNs <= seededVerifiedAt { + t.Fatalf("updated_at_ns = %d, want bumped above the seed %d", got.UpdatedAtNs, seededVerifiedAt) + } + + // A strict advance, even methodless, is new coverage proven, so it + // re-stamps verified_at_ns. + if err := s.UpsertDestinationRunID(ctx, vID, "bucket", node.ID, 9, false); err != nil { + t.Fatalf("methodless advance: %v", err) + } + got, err = s.GetDestinationRunID(ctx, vID, "bucket", node.ID) + if err != nil { + t.Fatalf("GetDestinationRunID: %v", err) + } + if !got.VerifiedAtNs.Valid || got.VerifiedAtNs.Int64 <= seededVerifiedAt { + t.Fatalf("verified_at_ns = %v, want re-stamped on the strict advance", got.VerifiedAtNs) + } +} + // TestDestinationRunIDNullVerifyMethodReadsUnverified pins the v19 // backfill contract: a component with a NULL verify_method (a pre-v19 // row, or a legacy upsert) scans back as an empty method, which diff --git a/store/migrations.go b/store/migrations.go index ffcc7bc..ce1f49a 100644 --- a/store/migrations.go +++ b/store/migrations.go @@ -10,7 +10,7 @@ import ( ) // SchemaVersion is the schema version this binary writes and reads. -const SchemaVersion = 22 +const SchemaVersion = 23 // freshSchemaBaseline is the version applied to a brand-new database. The // chain in `migrations` continues from here. v1 is no longer reachable from @@ -58,6 +58,7 @@ func buildMigrations(mctx migrationCtx) []migration { {version: 20, up: migrateV19ToV20}, {version: 21, up: migrateV20ToV21}, {version: 22, up: migrateV21ToV22}, + {version: 23, up: migrateV22ToV23}, } } @@ -1911,3 +1912,42 @@ func migrateV21ToV22(ctx context.Context, db *sql.DB) error { } return tx.Commit() } + +// --- v22 → v23 --- + +// migrateV22ToV23 adds verified_at_ns to the live durability vector so the +// offload gate can enforce a time-based staleness policy in addition to the +// version-vector coverage it already checks (issue #131). updated_at_ns +// keeps its prior meaning (the wall-clock of the last applied write, bumped +// even by an equal-value re-confirmation), while verified_at_ns records only +// the last write backed by genuine re-verification — a content-verified +// method or a strict run advance — so a no-op touch never makes evidence +// look freshly checked. +// +// The column is additive and nullable. Existing rows carry over NULL: their +// last verification time is unknown, so the gate treats a NULL as +// infinitely stale and refuses whenever a max-evidence-age is configured +// (fail-closed) — a re-verification re-stamps it on the next push or pull. +// The history table is unchanged: it already records each advance's at_ns, +// and the gate reads only the live vector. +// +// No index: the gate loads the whole vector for a target and filters in Go; +// no query selects by verified_at_ns. +func migrateV22ToV23(ctx context.Context, db *sql.DB) error { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + stmts := []string{ + `ALTER TABLE destination_run_ids ADD COLUMN verified_at_ns INTEGER`, + `INSERT INTO schema_version (version) VALUES (23)`, + } + for _, q := range stmts { + if _, err := tx.ExecContext(ctx, q); err != nil { + return fmt.Errorf("v22→v23: %w", err) + } + } + return tx.Commit() +} diff --git a/store/schema.sql b/store/schema.sql index 3b37223..7bc6523 100644 --- a/store/schema.sql +++ b/store/schema.sql @@ -1,6 +1,6 @@ -- Generated by `go test ./store -update-schema` — DO NOT EDIT. -- --- Flattened snapshot of the squirrel index schema at version 22, for humans +-- Flattened snapshot of the squirrel index schema at version 23, for humans -- and agents who want the current shape without replaying the migration -- chain in migrations.go. It is NOT used to create or migrate databases — -- a fresh DB is built by applyV5 plus the migration registry. The golden @@ -41,7 +41,7 @@ CREATE TABLE destination_run_ids ( destination TEXT NOT NULL, origin_node_id INTEGER NOT NULL REFERENCES nodes(id), origin_run_id INTEGER NOT NULL, - updated_at_ns INTEGER NOT NULL, verify_method TEXT, source_node_id INTEGER REFERENCES nodes(id), + updated_at_ns INTEGER NOT NULL, verify_method TEXT, source_node_id INTEGER REFERENCES nodes(id), verified_at_ns INTEGER, PRIMARY KEY (volume_id, destination, origin_node_id) );