diff --git a/cmd/squirrel/peer_sync_pull_durability.go b/cmd/squirrel/peer_sync_pull_durability.go index eddae9b..31679fc 100644 --- a/cmd/squirrel/peer_sync_pull_durability.go +++ b/cmd/squirrel/peer_sync_pull_durability.go @@ -62,9 +62,15 @@ func runPeerSyncPullDurability(cmd *cobra.Command, volumeName, peerName string, } func printDurabilityPull(w io.Writer, rep sync.DurabilityPullReport) { - fmt.Fprintf(w, "%s ← %s fetched=%d applied=%d\n", - rep.Volume, rep.Peer, rep.Fetched, rep.Applied) + fmt.Fprintf(w, "%s ← %s fetched=%d applied=%d dropped=%d\n", + rep.Volume, rep.Peer, rep.Fetched, rep.Applied, rep.Dropped) for _, rw := range rep.Rewinds { fmt.Fprintf(w, " refused rewind: %s\n", rw) } + for _, dr := range rep.Drops { + fmt.Fprintf(w, " dropped %s\n", dr) + } + if more := rep.Dropped - len(rep.Drops); more > 0 { + fmt.Fprintf(w, " … and %d more dropped\n", more) + } } diff --git a/cmd/squirrel/peer_sync_pull_durability_test.go b/cmd/squirrel/peer_sync_pull_durability_test.go index 99a564d..51f6fec 100644 --- a/cmd/squirrel/peer_sync_pull_durability_test.go +++ b/cmd/squirrel/peer_sync_pull_durability_test.go @@ -73,6 +73,7 @@ func newPullDurabilityFixture(t *testing.T) pullDurabilityFixture { [volumes.pics] path = %q +offload_requires = ["offsite-a"] [nodes.nas] endpoint = %q diff --git a/cmd/squirrel/sync.go b/cmd/squirrel/sync.go index 6985a3f..5f95595 100644 --- a/cmd/squirrel/sync.go +++ b/cmd/squirrel/sync.go @@ -192,8 +192,13 @@ func printSyncReport(w io.Writer, rep sync.Report, runErr error) { fmt.Fprintf(w, " mismatched %s: expected %s, actual %s\n", m.Path, m.ExpectedHex, m.ActualHex) } if rep.DurabilityPull.Fetched > 0 { - fmt.Fprintf(w, " durability: applied %d/%d peer components\n", + fmt.Fprintf(w, " durability: applied %d/%d peer entries", rep.DurabilityPull.Applied, rep.DurabilityPull.Fetched) + if rep.DurabilityPull.Dropped > 0 { + fmt.Fprintf(w, " (dropped %d for unconfigured destinations)", + rep.DurabilityPull.Dropped) + } + fmt.Fprintln(w) } } for _, c := range rep.NodeConflicts { diff --git a/sync/durability.go b/sync/durability.go index 6f61e72..2e77e60 100644 --- a/sync/durability.go +++ b/sync/durability.go @@ -10,17 +10,55 @@ import ( "github.com/mbertschler/squirrel/syncproto" ) +// maxDurabilityDropSamples caps how many dropped entries the report +// retains as detail. The Dropped count stays exact; only the sampled +// Drops slice is bounded, so an adversarial peer flooding out-of-scope +// destinations cannot blow up the report or the output that renders it. +const maxDurabilityDropSamples = 16 + // DurabilityPullReport summarises one durability metadata pull from a -// peer: how many vector components were fetched, how many landed in -// the local destination_run_ids (advanced or re-confirmed), and which -// were refused as rewinds. Every fetched component lands in exactly -// one of the two buckets. +// peer: how many entries (vector components and freshness coordinates) +// were fetched, how many components landed in the local +// destination_run_ids (advanced or re-confirmed), how many were refused +// as rewinds, and how many entries were dropped because the peer named a +// destination outside this volume's accepted target set. Every fetched +// entry lands in exactly one of the applied / rewind / dropped buckets +// (a merged freshness coordinate counts as applied). type DurabilityPullReport struct { Volume string Peer string Fetched int Applied int + Dropped int Rewinds []DurabilityRewind + // Drops samples the dropped entries up to maxDurabilityDropSamples; + // Dropped is the exact total. + Drops []DurabilityDrop +} + +// recordDrop counts a dropped entry and samples it into Drops up to the +// cap, so the exact total survives while the detail stays bounded. +func (r *DurabilityPullReport) recordDrop(d DurabilityDrop) { + r.Dropped++ + if len(r.Drops) < maxDurabilityDropSamples { + r.Drops = append(r.Drops, d) + } +} + +// DurabilityDrop is one pulled entry the merge discarded because its +// destination falls outside the volume's accepted target set +// (offload_requires ∪ sync_to). Drops are counted and sampled so a peer +// asserting evidence for destinations this node uses for neither offload +// nor sync stays observable. +type DurabilityDrop struct { + Destination string + OriginNode string + Kind string // "component" or "freshness" +} + +func (d DurabilityDrop) String() string { + return fmt.Sprintf("%s for unconfigured destination %s origin %s", + d.Kind, d.Destination, d.OriginNode) } // DurabilityRewind is one component the pull refused because the peer @@ -47,6 +85,12 @@ func (r DurabilityRewind) String() string { // watermark store, refused rewinds are reported on the result (not // applied), and allowRewind is the explicit recovery override. // +// The merge is scoped to destinations this volume actually references +// (offload_requires ∪ sync_to); evidence for any other destination is +// dropped, so a buggy or compromised peer cannot pollute the local +// vector with rows for destinations this node neither requires for +// offload nor syncs to. +// // The standalone `peer-sync pull-durability` command and the automatic // post-close pull share this implementation. func PullDurability(ctx context.Context, s *store.Store, vol *config.Volume, node *config.Node, allowRewind bool) (DurabilityPullReport, error) { @@ -57,18 +101,35 @@ func PullDurability(ctx context.Context, s *store.Store, vol *config.Volume, nod } return DurabilityPullReport{}, fmt.Errorf("lookup volume %q: %w", vol.Name, err) } - return pullDurability(ctx, s, newNodeClient(node), vol.Name, v.ID, node.Name, allowRewind) + return pullDurability(ctx, s, newNodeClient(node), vol.Name, v.ID, node.Name, acceptedDestinations(vol), allowRewind) +} + +// acceptedDestinations is the set of destination names this volume +// references: the union of its offload_requires and sync_to entries. +// A pulled durability entry for any name outside this set has no bearing +// on the volume's local decisions and is dropped by the pull. +func acceptedDestinations(vol *config.Volume) map[string]struct{} { + accepted := make(map[string]struct{}, len(vol.OffloadRequires)+len(vol.SyncTo)) + for _, name := range vol.OffloadRequires { + accepted[name] = struct{}{} + } + for _, name := range vol.SyncTo { + accepted[name] = struct{}{} + } + return accepted } // pullDurability is the transport-injected body of PullDurability, // shared with the node-sync driver (which already holds a client). -func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, volumeName string, volumeID int64, peerName string, allowRewind bool) (DurabilityPullReport, error) { +// accepted scopes which destinations the merge will store (see +// acceptedDestinations). +func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, volumeName string, volumeID int64, peerName string, accepted map[string]struct{}, allowRewind bool) (DurabilityPullReport, error) { rep := DurabilityPullReport{Volume: volumeName, Peer: peerName} resp, err := client.durability(ctx, syncproto.DurabilityRequest{Volume: volumeName}) if err != nil { return rep, err } - rep.Fetched = len(resp.Components) + rep.Fetched = len(resp.Components) + len(resp.Freshness) originIDs := make(map[string]int64, 4) resolveOrigin := func(name string) (int64, error) { if id, ok := originIDs[name]; ok { @@ -82,6 +143,10 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol return node.ID, nil } for _, c := range resp.Components { + if _, ok := accepted[c.Destination]; !ok { + rep.recordDrop(DurabilityDrop{Destination: c.Destination, OriginNode: c.OriginNode, Kind: "component"}) + continue + } if err := validateComponent(c); err != nil { return rep, fmt.Errorf("component %+v: %w", c, err) } @@ -106,6 +171,10 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol rep.Applied++ } for _, f := range resp.Freshness { + if _, ok := accepted[f.Destination]; !ok { + rep.recordDrop(DurabilityDrop{Destination: f.Destination, OriginNode: f.OriginNode, Kind: "freshness"}) + continue + } if err := validateFreshness(f); err != nil { return rep, fmt.Errorf("freshness %+v: %w", f, err) } @@ -116,6 +185,7 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol if err := s.MergeDestinationPushFreshness(ctx, volumeID, f.Destination, nodeID, f.OriginRun); err != nil { return rep, fmt.Errorf("apply freshness for destination %q origin %q: %w", f.Destination, f.OriginNode, err) } + rep.Applied++ } return rep, nil } diff --git a/sync/durability_test.go b/sync/durability_test.go index be34cbf..4f162d2 100644 --- a/sync/durability_test.go +++ b/sync/durability_test.go @@ -2,6 +2,7 @@ package sync import ( "context" + "fmt" "strings" "testing" ) @@ -58,6 +59,8 @@ func seedReceiverFreshness(t *testing.T, f *nodeFixture, coords map[string]int64 func TestPullDurabilityMergesFreshness(t *testing.T) { f := setupNodeFixtureNoRclone(t) ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = []string{"offsite-b"} originName := seedReceiverFreshness(t, f, map[string]int64{ "offsite-a": 12, "offsite-b": 5, @@ -98,6 +101,8 @@ func TestPullDurabilityMergesFreshness(t *testing.T) { func TestPullDurabilityCopiesComponents(t *testing.T) { f := setupNodeFixtureNoRclone(t) ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = []string{"offsite-b"} originName := seedReceiverDurability(t, f, map[string]int64{ "offsite-a": 12, "offsite-b": 5, @@ -130,12 +135,123 @@ func TestPullDurabilityCopiesComponents(t *testing.T) { } } +// TestPullDurabilityDropsUnconfiguredDestinations: the pull merges +// components for destinations the volume references (one via +// offload_requires, one via sync_to) and drops one for an unconfigured +// destination — counted and reported, never stored, and without +// aborting the merge of the legitimate components. +func TestPullDurabilityDropsUnconfiguredDestinations(t *testing.T) { + f := setupNodeFixtureNoRclone(t) + ctx := context.Background() + f.initVol.OffloadRequires = []string{"offload-target"} + f.initVol.SyncTo = []string{"sync-target"} + originName := seedReceiverDurability(t, f, map[string]int64{ + "offload-target": 12, + "sync-target": 5, + "junk": 99, + }) + + v, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path) + if err != nil { + t.Fatalf("CreateVolume on initiator: %v", err) + } + rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false) + if err != nil { + t.Fatalf("PullDurability: %v", err) + } + if rep.Fetched != 3 || rep.Applied != 2 || rep.Dropped != 1 || len(rep.Rewinds) != 0 { + t.Fatalf("report = %+v, want fetched=3 applied=2 dropped=1 no rewinds", rep) + } + if len(rep.Drops) != 1 || rep.Drops[0].Destination != "junk" || rep.Drops[0].Kind != "component" { + t.Fatalf("drops = %+v, want one component drop for junk", rep.Drops) + } + + origin, err := f.initStore.GetNodeByName(ctx, originName) + if err != nil { + t.Fatalf("origin node %q was not created locally: %v", originName, err) + } + for dest, want := range map[string]int64{"offload-target": 12, "sync-target": 5} { + got, err := f.initStore.GetDestinationRunID(ctx, v.ID, dest, origin.ID) + if err != nil { + t.Fatalf("GetDestinationRunID %s: %v", dest, err) + } + if got.OriginRunID != want { + t.Fatalf("%s component = %d, want %d", dest, got.OriginRunID, want) + } + } + if _, err := f.initStore.GetDestinationRunID(ctx, v.ID, "junk", origin.ID); err == nil { + t.Fatal("junk component was stored, want it dropped") + } +} + +// TestPullDurabilityDropsUnconfiguredFreshness: a freshness coordinate +// for a destination outside the volume's accepted set is dropped and +// counted (Kind "freshness") just like a stray vector component, so a +// peer can't seed push-freshness for a destination this node never uses. +func TestPullDurabilityDropsUnconfiguredFreshness(t *testing.T) { + f := setupNodeFixtureNoRclone(t) + ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = nil + seedReceiverFreshness(t, f, map[string]int64{ + "offsite-a": 12, + "junk": 99, + }) + + if _, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path); err != nil { + t.Fatalf("CreateVolume on initiator: %v", err) + } + rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false) + if err != nil { + t.Fatalf("PullDurability: %v", err) + } + if rep.Dropped != 1 || len(rep.Drops) != 1 { + t.Fatalf("report = %+v, want exactly one drop", rep) + } + if rep.Drops[0].Destination != "junk" || rep.Drops[0].Kind != "freshness" { + t.Fatalf("drop = %+v, want a freshness drop for junk", rep.Drops[0]) + } + if rep.Fetched < 1 || rep.Applied < 1 { + t.Fatalf("report = %+v, want the accepted offsite-a freshness still applied and counted", rep) + } +} + +// TestPullDurabilityCapsDropSamples: a peer flooding many out-of-scope +// destinations keeps the exact Dropped count but bounds the sampled +// Drops slice, so neither the report nor the output it feeds can grow +// unbounded under an adversarial peer. +func TestPullDurabilityCapsDropSamples(t *testing.T) { + f := setupNodeFixtureNoRclone(t) + ctx := context.Background() + junk := make(map[string]int64, 50) + for i := range 50 { + junk[fmt.Sprintf("junk-%02d", i)] = 1 + } + seedReceiverDurability(t, f, junk) + + if _, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path); err != nil { + t.Fatalf("CreateVolume on initiator: %v", err) + } + rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false) + if err != nil { + t.Fatalf("PullDurability: %v", err) + } + if rep.Fetched != 50 || rep.Applied != 0 || rep.Dropped != 50 { + t.Fatalf("report = fetched=%d applied=%d dropped=%d, want 50/0/50", rep.Fetched, rep.Applied, rep.Dropped) + } + if len(rep.Drops) > 16 { + t.Fatalf("len(Drops) = %d, want capped at 16", len(rep.Drops)) + } +} + // TestPullDurabilityRefusesRewind: a peer component below the locally // recorded value is refused and reported, leaving the local value in // place; the allow-rewind opt-in accepts it. func TestPullDurabilityRefusesRewind(t *testing.T) { f := setupNodeFixtureNoRclone(t) ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = []string{"offsite-b"} originName := seedReceiverDurability(t, f, map[string]int64{ "offsite-a": 12, "offsite-b": 5, diff --git a/sync/node.go b/sync/node.go index 1b5665b..ba92d79 100644 --- a/sync/node.go +++ b/sync/node.go @@ -677,12 +677,13 @@ func (d *nodeSyncDriver) phaseClose() error { } // pullPeerDurability fetches the peer's destination vectors and merges -// them into the local store. Failures and refused rewinds surface as -// report warnings rather than failing the run: the sync itself -// succeeded, and the pull can be retried any time via the standalone -// `peer-sync pull-durability` command. +// them into the local store. Failures, refused rewinds, and components +// dropped for unconfigured destinations surface as report warnings +// rather than failing the run: the sync itself succeeded, and the pull +// can be retried any time via the standalone `peer-sync pull-durability` +// command. func (d *nodeSyncDriver) pullPeerDurability() { - rep, err := pullDurability(d.ctx, d.store, d.client, d.vol.Name, d.volID, d.node.Name, false) + rep, err := pullDurability(d.ctx, d.store, d.client, d.vol.Name, d.volID, d.node.Name, acceptedDestinations(d.vol), false) d.report.DurabilityPull = rep if err != nil { d.report.Warnings = append(d.report.Warnings, @@ -693,6 +694,33 @@ func (d *nodeSyncDriver) pullPeerDurability() { d.report.Warnings = append(d.report.Warnings, fmt.Sprintf("durability pull from %s refused rewind: %s", d.node.Name, rw)) } + if rep.Dropped > 0 { + d.report.Warnings = append(d.report.Warnings, + fmt.Sprintf("durability pull from %s dropped %d entr%s for unconfigured destinations (e.g. %s)", + d.node.Name, rep.Dropped, plural(rep.Dropped, "y", "ies"), dropSample(rep.Drops))) + } +} + +func plural(n int, one, many string) string { + if n == 1 { + return one + } + return many +} + +// dropSample renders the sampled destinations from a drop list as a +// compact, deduplicated, comma-separated string for one summary line. +func dropSample(drops []DurabilityDrop) string { + seen := make(map[string]struct{}, len(drops)) + var names []string + for _, d := range drops { + if _, ok := seen[d.Destination]; ok { + continue + } + seen[d.Destination] = struct{}{} + names = append(names, d.Destination) + } + return strings.Join(names, ", ") } func (d *nodeSyncDriver) abortWithError(phase string, err error) error { diff --git a/sync/node_origin_test.go b/sync/node_origin_test.go index 582e126..4abac66 100644 --- a/sync/node_origin_test.go +++ b/sync/node_origin_test.go @@ -238,7 +238,9 @@ func TestNodeSyncAdvancesVectorAndPullsDurabilityAtClose(t *testing.T) { f := setupNodeFixture(t) ctx := context.Background() - // The peer knows about a destination only it can see. + // The peer knows about a destination only it can see; the volume + // requires it for offload, so the pull accepts the peer's evidence. + f.initVol.OffloadRequires = []string{"offsite-x"} recvSelfName := seedReceiverDurability(t, f, map[string]int64{"offsite-x": 7}) // Initiator: one forwarded-origin file seeded before indexing (a