From d1b5a8c02d2acb6e513d043759b02bec85b54951 Mon Sep 17 00:00:00 2001 From: Martin Bertschler Date: Thu, 11 Jun 2026 05:42:29 +0200 Subject: [PATCH 1/4] sync: scope durability pull to the volume's accepted destinations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The durability pull merged peer-asserted components and freshness coordinates for any destination name a peer sent, so a buggy or compromised peer could pollute the local destination_run_ids with rows for destinations this node neither requires for offload nor syncs to. Scope the merge to the volume's offload_requires ∪ sync_to set: an entry for any other destination is dropped (not an error that aborts the pull, so one junk entry cannot deny a legitimate sync its durability exchange). Drops are counted and listed on the report, and surfaced as sync warnings, so the filtering is observable rather than silent. The monotonic merge and rewind refusal for accepted components are unchanged. Addresses #104 (pull-scoping hygiene; the exploitable gating concern was closed by #120). --- sync/durability.go | 71 +++++++++++++++++++++++++++++++++++++--- sync/durability_test.go | 55 +++++++++++++++++++++++++++++++ sync/node.go | 6 +++- sync/node_origin_test.go | 4 ++- 4 files changed, 129 insertions(+), 7 deletions(-) diff --git a/sync/durability.go b/sync/durability.go index 6f61e72..178845e 100644 --- a/sync/durability.go +++ b/sync/durability.go @@ -12,15 +12,35 @@ import ( // DurabilityPullReport summarises one durability metadata pull from a // peer: how many vector components were fetched, how many landed in -// the local destination_run_ids (advanced or re-confirmed), and which -// were refused as rewinds. Every fetched component lands in exactly -// one of the two buckets. +// the local destination_run_ids (advanced or re-confirmed), how many +// were refused as rewinds, and how many were dropped because the peer +// named a destination outside this volume's accepted target set. Every +// fetched component lands in exactly one of the applied / rewind / +// dropped buckets. type DurabilityPullReport struct { Volume string Peer string Fetched int Applied int + Dropped int Rewinds []DurabilityRewind + Drops []DurabilityDrop +} + +// DurabilityDrop is one pulled entry the merge discarded because its +// destination falls outside the volume's accepted target set +// (offload_requires ∪ sync_to). Drops are counted and listed so a peer +// asserting evidence for destinations this node uses for neither offload +// nor sync stays observable. +type DurabilityDrop struct { + Destination string + OriginNode string + Kind string // "component" or "freshness" +} + +func (d DurabilityDrop) String() string { + return fmt.Sprintf("%s for unconfigured destination %s origin %s", + d.Kind, d.Destination, d.OriginNode) } // DurabilityRewind is one component the pull refused because the peer @@ -47,6 +67,12 @@ func (r DurabilityRewind) String() string { // watermark store, refused rewinds are reported on the result (not // applied), and allowRewind is the explicit recovery override. // +// The merge is scoped to destinations this volume actually references +// (offload_requires ∪ sync_to); evidence for any other destination is +// dropped, so a buggy or compromised peer cannot pollute the local +// vector with rows for destinations this node neither requires for +// offload nor syncs to. +// // The standalone `peer-sync pull-durability` command and the automatic // post-close pull share this implementation. func PullDurability(ctx context.Context, s *store.Store, vol *config.Volume, node *config.Node, allowRewind bool) (DurabilityPullReport, error) { @@ -57,12 +83,29 @@ func PullDurability(ctx context.Context, s *store.Store, vol *config.Volume, nod } return DurabilityPullReport{}, fmt.Errorf("lookup volume %q: %w", vol.Name, err) } - return pullDurability(ctx, s, newNodeClient(node), vol.Name, v.ID, node.Name, allowRewind) + return pullDurability(ctx, s, newNodeClient(node), vol.Name, v.ID, node.Name, acceptedDestinations(vol), allowRewind) +} + +// acceptedDestinations is the set of destination names this volume +// references: the union of its offload_requires and sync_to entries. +// A pulled durability entry for any name outside this set has no bearing +// on the volume's local decisions and is dropped by the pull. +func acceptedDestinations(vol *config.Volume) map[string]struct{} { + accepted := make(map[string]struct{}, len(vol.OffloadRequires)+len(vol.SyncTo)) + for _, name := range vol.OffloadRequires { + accepted[name] = struct{}{} + } + for _, name := range vol.SyncTo { + accepted[name] = struct{}{} + } + return accepted } // pullDurability is the transport-injected body of PullDurability, // shared with the node-sync driver (which already holds a client). -func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, volumeName string, volumeID int64, peerName string, allowRewind bool) (DurabilityPullReport, error) { +// accepted scopes which destinations the merge will store (see +// acceptedDestinations). +func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, volumeName string, volumeID int64, peerName string, accepted map[string]struct{}, allowRewind bool) (DurabilityPullReport, error) { rep := DurabilityPullReport{Volume: volumeName, Peer: peerName} resp, err := client.durability(ctx, syncproto.DurabilityRequest{Volume: volumeName}) if err != nil { @@ -85,6 +128,15 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol if err := validateComponent(c); err != nil { return rep, fmt.Errorf("component %+v: %w", c, err) } + if _, ok := accepted[c.Destination]; !ok { + rep.Dropped++ + rep.Drops = append(rep.Drops, DurabilityDrop{ + Destination: c.Destination, + OriginNode: c.OriginNode, + Kind: "component", + }) + continue + } nodeID, err := resolveOrigin(c.OriginNode) if err != nil { return rep, err @@ -109,6 +161,15 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol if err := validateFreshness(f); err != nil { return rep, fmt.Errorf("freshness %+v: %w", f, err) } + if _, ok := accepted[f.Destination]; !ok { + rep.Dropped++ + rep.Drops = append(rep.Drops, DurabilityDrop{ + Destination: f.Destination, + OriginNode: f.OriginNode, + Kind: "freshness", + }) + continue + } nodeID, err := resolveOrigin(f.OriginNode) if err != nil { return rep, err diff --git a/sync/durability_test.go b/sync/durability_test.go index be34cbf..99cf309 100644 --- a/sync/durability_test.go +++ b/sync/durability_test.go @@ -58,6 +58,8 @@ func seedReceiverFreshness(t *testing.T, f *nodeFixture, coords map[string]int64 func TestPullDurabilityMergesFreshness(t *testing.T) { f := setupNodeFixtureNoRclone(t) ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = []string{"offsite-b"} originName := seedReceiverFreshness(t, f, map[string]int64{ "offsite-a": 12, "offsite-b": 5, @@ -98,6 +100,8 @@ func TestPullDurabilityMergesFreshness(t *testing.T) { func TestPullDurabilityCopiesComponents(t *testing.T) { f := setupNodeFixtureNoRclone(t) ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = []string{"offsite-b"} originName := seedReceiverDurability(t, f, map[string]int64{ "offsite-a": 12, "offsite-b": 5, @@ -130,12 +134,63 @@ func TestPullDurabilityCopiesComponents(t *testing.T) { } } +// TestPullDurabilityDropsUnconfiguredDestinations: the pull merges +// components for destinations the volume references (one via +// offload_requires, one via sync_to) and drops one for an unconfigured +// destination — counted and reported, never stored, and without +// aborting the merge of the legitimate components. +func TestPullDurabilityDropsUnconfiguredDestinations(t *testing.T) { + f := setupNodeFixtureNoRclone(t) + ctx := context.Background() + f.initVol.OffloadRequires = []string{"offload-target"} + f.initVol.SyncTo = []string{"sync-target"} + originName := seedReceiverDurability(t, f, map[string]int64{ + "offload-target": 12, + "sync-target": 5, + "junk": 99, + }) + + v, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path) + if err != nil { + t.Fatalf("CreateVolume on initiator: %v", err) + } + rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false) + if err != nil { + t.Fatalf("PullDurability: %v", err) + } + if rep.Fetched != 3 || rep.Applied != 2 || rep.Dropped != 1 || len(rep.Rewinds) != 0 { + t.Fatalf("report = %+v, want fetched=3 applied=2 dropped=1 no rewinds", rep) + } + if len(rep.Drops) != 1 || rep.Drops[0].Destination != "junk" || rep.Drops[0].Kind != "component" { + t.Fatalf("drops = %+v, want one component drop for junk", rep.Drops) + } + + origin, err := f.initStore.GetNodeByName(ctx, originName) + if err != nil { + t.Fatalf("origin node %q was not created locally: %v", originName, err) + } + for dest, want := range map[string]int64{"offload-target": 12, "sync-target": 5} { + got, err := f.initStore.GetDestinationRunID(ctx, v.ID, dest, origin.ID) + if err != nil { + t.Fatalf("GetDestinationRunID %s: %v", dest, err) + } + if got.OriginRunID != want { + t.Fatalf("%s component = %d, want %d", dest, got.OriginRunID, want) + } + } + if _, err := f.initStore.GetDestinationRunID(ctx, v.ID, "junk", origin.ID); err == nil { + t.Fatal("junk component was stored, want it dropped") + } +} + // TestPullDurabilityRefusesRewind: a peer component below the locally // recorded value is refused and reported, leaving the local value in // place; the allow-rewind opt-in accepts it. func TestPullDurabilityRefusesRewind(t *testing.T) { f := setupNodeFixtureNoRclone(t) ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = []string{"offsite-b"} originName := seedReceiverDurability(t, f, map[string]int64{ "offsite-a": 12, "offsite-b": 5, diff --git a/sync/node.go b/sync/node.go index 1b5665b..7005b7e 100644 --- a/sync/node.go +++ b/sync/node.go @@ -682,7 +682,7 @@ func (d *nodeSyncDriver) phaseClose() error { // succeeded, and the pull can be retried any time via the standalone // `peer-sync pull-durability` command. func (d *nodeSyncDriver) pullPeerDurability() { - rep, err := pullDurability(d.ctx, d.store, d.client, d.vol.Name, d.volID, d.node.Name, false) + rep, err := pullDurability(d.ctx, d.store, d.client, d.vol.Name, d.volID, d.node.Name, acceptedDestinations(d.vol), false) d.report.DurabilityPull = rep if err != nil { d.report.Warnings = append(d.report.Warnings, @@ -693,6 +693,10 @@ func (d *nodeSyncDriver) pullPeerDurability() { d.report.Warnings = append(d.report.Warnings, fmt.Sprintf("durability pull from %s refused rewind: %s", d.node.Name, rw)) } + for _, dr := range rep.Drops { + d.report.Warnings = append(d.report.Warnings, + fmt.Sprintf("durability pull from %s dropped %s", d.node.Name, dr)) + } } func (d *nodeSyncDriver) abortWithError(phase string, err error) error { diff --git a/sync/node_origin_test.go b/sync/node_origin_test.go index 582e126..4abac66 100644 --- a/sync/node_origin_test.go +++ b/sync/node_origin_test.go @@ -238,7 +238,9 @@ func TestNodeSyncAdvancesVectorAndPullsDurabilityAtClose(t *testing.T) { f := setupNodeFixture(t) ctx := context.Background() - // The peer knows about a destination only it can see. + // The peer knows about a destination only it can see; the volume + // requires it for offload, so the pull accepts the peer's evidence. + f.initVol.OffloadRequires = []string{"offsite-x"} recvSelfName := seedReceiverDurability(t, f, map[string]int64{"offsite-x": 7}) // Initiator: one forwarded-origin file seeded before indexing (a From 3b66c8a547b189047e188033a95a15738fe7235b Mon Sep 17 00:00:00 2001 From: Martin Bertschler Date: Thu, 11 Jun 2026 05:42:36 +0200 Subject: [PATCH 2/4] cmd: report dropped durability entries in pull output Surface the new dropped count: the standalone peer-sync pull-durability summary adds dropped=N with a per-drop detail line, and the sync output's durability line notes drops for unconfigured destinations. The CLI pull fixture's volume declares offload_requires so its seeded component is accepted under the scoped merge. --- cmd/squirrel/peer_sync_pull_durability.go | 7 +++++-- cmd/squirrel/peer_sync_pull_durability_test.go | 1 + cmd/squirrel/sync.go | 7 ++++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cmd/squirrel/peer_sync_pull_durability.go b/cmd/squirrel/peer_sync_pull_durability.go index eddae9b..7fe7f9d 100644 --- a/cmd/squirrel/peer_sync_pull_durability.go +++ b/cmd/squirrel/peer_sync_pull_durability.go @@ -62,9 +62,12 @@ func runPeerSyncPullDurability(cmd *cobra.Command, volumeName, peerName string, } func printDurabilityPull(w io.Writer, rep sync.DurabilityPullReport) { - fmt.Fprintf(w, "%s ← %s fetched=%d applied=%d\n", - rep.Volume, rep.Peer, rep.Fetched, rep.Applied) + fmt.Fprintf(w, "%s ← %s fetched=%d applied=%d dropped=%d\n", + rep.Volume, rep.Peer, rep.Fetched, rep.Applied, rep.Dropped) for _, rw := range rep.Rewinds { fmt.Fprintf(w, " refused rewind: %s\n", rw) } + for _, dr := range rep.Drops { + fmt.Fprintf(w, " dropped %s\n", dr) + } } diff --git a/cmd/squirrel/peer_sync_pull_durability_test.go b/cmd/squirrel/peer_sync_pull_durability_test.go index 99a564d..51f6fec 100644 --- a/cmd/squirrel/peer_sync_pull_durability_test.go +++ b/cmd/squirrel/peer_sync_pull_durability_test.go @@ -73,6 +73,7 @@ func newPullDurabilityFixture(t *testing.T) pullDurabilityFixture { [volumes.pics] path = %q +offload_requires = ["offsite-a"] [nodes.nas] endpoint = %q diff --git a/cmd/squirrel/sync.go b/cmd/squirrel/sync.go index 6985a3f..fefce17 100644 --- a/cmd/squirrel/sync.go +++ b/cmd/squirrel/sync.go @@ -192,8 +192,13 @@ func printSyncReport(w io.Writer, rep sync.Report, runErr error) { fmt.Fprintf(w, " mismatched %s: expected %s, actual %s\n", m.Path, m.ExpectedHex, m.ActualHex) } if rep.DurabilityPull.Fetched > 0 { - fmt.Fprintf(w, " durability: applied %d/%d peer components\n", + fmt.Fprintf(w, " durability: applied %d/%d peer components", rep.DurabilityPull.Applied, rep.DurabilityPull.Fetched) + if rep.DurabilityPull.Dropped > 0 { + fmt.Fprintf(w, " (dropped %d for unconfigured destinations)", + rep.DurabilityPull.Dropped) + } + fmt.Fprintln(w) } } for _, c := range rep.NodeConflicts { From e89b10f041e658bcd709cb8be7705c3446f3d1ab Mon Sep 17 00:00:00 2001 From: Martin Bertschler Date: Thu, 11 Jun 2026 05:51:50 +0200 Subject: [PATCH 3/4] sync: test that an unconfigured freshness coordinate is dropped and counted --- sync/durability_test.go | 61 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/sync/durability_test.go b/sync/durability_test.go index 99cf309..4f162d2 100644 --- a/sync/durability_test.go +++ b/sync/durability_test.go @@ -2,6 +2,7 @@ package sync import ( "context" + "fmt" "strings" "testing" ) @@ -183,6 +184,66 @@ func TestPullDurabilityDropsUnconfiguredDestinations(t *testing.T) { } } +// TestPullDurabilityDropsUnconfiguredFreshness: a freshness coordinate +// for a destination outside the volume's accepted set is dropped and +// counted (Kind "freshness") just like a stray vector component, so a +// peer can't seed push-freshness for a destination this node never uses. +func TestPullDurabilityDropsUnconfiguredFreshness(t *testing.T) { + f := setupNodeFixtureNoRclone(t) + ctx := context.Background() + f.initVol.OffloadRequires = []string{"offsite-a"} + f.initVol.SyncTo = nil + seedReceiverFreshness(t, f, map[string]int64{ + "offsite-a": 12, + "junk": 99, + }) + + if _, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path); err != nil { + t.Fatalf("CreateVolume on initiator: %v", err) + } + rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false) + if err != nil { + t.Fatalf("PullDurability: %v", err) + } + if rep.Dropped != 1 || len(rep.Drops) != 1 { + t.Fatalf("report = %+v, want exactly one drop", rep) + } + if rep.Drops[0].Destination != "junk" || rep.Drops[0].Kind != "freshness" { + t.Fatalf("drop = %+v, want a freshness drop for junk", rep.Drops[0]) + } + if rep.Fetched < 1 || rep.Applied < 1 { + t.Fatalf("report = %+v, want the accepted offsite-a freshness still applied and counted", rep) + } +} + +// TestPullDurabilityCapsDropSamples: a peer flooding many out-of-scope +// destinations keeps the exact Dropped count but bounds the sampled +// Drops slice, so neither the report nor the output it feeds can grow +// unbounded under an adversarial peer. +func TestPullDurabilityCapsDropSamples(t *testing.T) { + f := setupNodeFixtureNoRclone(t) + ctx := context.Background() + junk := make(map[string]int64, 50) + for i := range 50 { + junk[fmt.Sprintf("junk-%02d", i)] = 1 + } + seedReceiverDurability(t, f, junk) + + if _, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path); err != nil { + t.Fatalf("CreateVolume on initiator: %v", err) + } + rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false) + if err != nil { + t.Fatalf("PullDurability: %v", err) + } + if rep.Fetched != 50 || rep.Applied != 0 || rep.Dropped != 50 { + t.Fatalf("report = fetched=%d applied=%d dropped=%d, want 50/0/50", rep.Fetched, rep.Applied, rep.Dropped) + } + if len(rep.Drops) > 16 { + t.Fatalf("len(Drops) = %d, want capped at 16", len(rep.Drops)) + } +} + // TestPullDurabilityRefusesRewind: a peer component below the locally // recorded value is refused and reported, leaving the local value in // place; the allow-rewind opt-in accepts it. From 244369b90f108b64269bb62cbd62f96626a66e7d Mon Sep 17 00:00:00 2001 From: Martin Bertschler Date: Thu, 11 Jun 2026 05:51:54 +0200 Subject: [PATCH 4/4] sync: harden the durability-pull filter for adversarial peers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Copilot review on #123: - Scope-check before full validation. An out-of-scope destination is dropped before validateComponent/validateFreshness runs, so a junk entry for an unconfigured destination with a malformed origin can no longer abort the whole pull — restoring the intended fail-safe. - Reconcile the counts. Fetched now counts components and freshness together and a merged freshness coordinate counts as applied, so every fetched entry lands in exactly one of applied/rewind/dropped and Dropped can never exceed Fetched. - Bound the drop detail. Dropped stays exact, but the sampled Drops slice is capped (maxDurabilityDropSamples) and the sync warning is a single summary line, so a peer flooding out-of-scope destinations can't blow up the report or the output that renders it. The CLI printer prints the sample then a '… N more' tail. Tests: freshness-drop path, the sample cap under a 50-destination flood, and the count reconciliation. --- cmd/squirrel/peer_sync_pull_durability.go | 3 ++ cmd/squirrel/sync.go | 2 +- sync/durability.go | 63 +++++++++++++---------- sync/node.go | 36 ++++++++++--- 4 files changed, 70 insertions(+), 34 deletions(-) diff --git a/cmd/squirrel/peer_sync_pull_durability.go b/cmd/squirrel/peer_sync_pull_durability.go index 7fe7f9d..31679fc 100644 --- a/cmd/squirrel/peer_sync_pull_durability.go +++ b/cmd/squirrel/peer_sync_pull_durability.go @@ -70,4 +70,7 @@ func printDurabilityPull(w io.Writer, rep sync.DurabilityPullReport) { for _, dr := range rep.Drops { fmt.Fprintf(w, " dropped %s\n", dr) } + if more := rep.Dropped - len(rep.Drops); more > 0 { + fmt.Fprintf(w, " … and %d more dropped\n", more) + } } diff --git a/cmd/squirrel/sync.go b/cmd/squirrel/sync.go index fefce17..5f95595 100644 --- a/cmd/squirrel/sync.go +++ b/cmd/squirrel/sync.go @@ -192,7 +192,7 @@ func printSyncReport(w io.Writer, rep sync.Report, runErr error) { fmt.Fprintf(w, " mismatched %s: expected %s, actual %s\n", m.Path, m.ExpectedHex, m.ActualHex) } if rep.DurabilityPull.Fetched > 0 { - fmt.Fprintf(w, " durability: applied %d/%d peer components", + fmt.Fprintf(w, " durability: applied %d/%d peer entries", rep.DurabilityPull.Applied, rep.DurabilityPull.Fetched) if rep.DurabilityPull.Dropped > 0 { fmt.Fprintf(w, " (dropped %d for unconfigured destinations)", diff --git a/sync/durability.go b/sync/durability.go index 178845e..2e77e60 100644 --- a/sync/durability.go +++ b/sync/durability.go @@ -10,13 +10,20 @@ import ( "github.com/mbertschler/squirrel/syncproto" ) +// maxDurabilityDropSamples caps how many dropped entries the report +// retains as detail. The Dropped count stays exact; only the sampled +// Drops slice is bounded, so an adversarial peer flooding out-of-scope +// destinations cannot blow up the report or the output that renders it. +const maxDurabilityDropSamples = 16 + // DurabilityPullReport summarises one durability metadata pull from a -// peer: how many vector components were fetched, how many landed in -// the local destination_run_ids (advanced or re-confirmed), how many -// were refused as rewinds, and how many were dropped because the peer -// named a destination outside this volume's accepted target set. Every -// fetched component lands in exactly one of the applied / rewind / -// dropped buckets. +// peer: how many entries (vector components and freshness coordinates) +// were fetched, how many components landed in the local +// destination_run_ids (advanced or re-confirmed), how many were refused +// as rewinds, and how many entries were dropped because the peer named a +// destination outside this volume's accepted target set. Every fetched +// entry lands in exactly one of the applied / rewind / dropped buckets +// (a merged freshness coordinate counts as applied). type DurabilityPullReport struct { Volume string Peer string @@ -24,12 +31,23 @@ type DurabilityPullReport struct { Applied int Dropped int Rewinds []DurabilityRewind - Drops []DurabilityDrop + // Drops samples the dropped entries up to maxDurabilityDropSamples; + // Dropped is the exact total. + Drops []DurabilityDrop +} + +// recordDrop counts a dropped entry and samples it into Drops up to the +// cap, so the exact total survives while the detail stays bounded. +func (r *DurabilityPullReport) recordDrop(d DurabilityDrop) { + r.Dropped++ + if len(r.Drops) < maxDurabilityDropSamples { + r.Drops = append(r.Drops, d) + } } // DurabilityDrop is one pulled entry the merge discarded because its // destination falls outside the volume's accepted target set -// (offload_requires ∪ sync_to). Drops are counted and listed so a peer +// (offload_requires ∪ sync_to). Drops are counted and sampled so a peer // asserting evidence for destinations this node uses for neither offload // nor sync stays observable. type DurabilityDrop struct { @@ -111,7 +129,7 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol if err != nil { return rep, err } - rep.Fetched = len(resp.Components) + rep.Fetched = len(resp.Components) + len(resp.Freshness) originIDs := make(map[string]int64, 4) resolveOrigin := func(name string) (int64, error) { if id, ok := originIDs[name]; ok { @@ -125,18 +143,13 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol return node.ID, nil } for _, c := range resp.Components { - if err := validateComponent(c); err != nil { - return rep, fmt.Errorf("component %+v: %w", c, err) - } if _, ok := accepted[c.Destination]; !ok { - rep.Dropped++ - rep.Drops = append(rep.Drops, DurabilityDrop{ - Destination: c.Destination, - OriginNode: c.OriginNode, - Kind: "component", - }) + rep.recordDrop(DurabilityDrop{Destination: c.Destination, OriginNode: c.OriginNode, Kind: "component"}) continue } + if err := validateComponent(c); err != nil { + return rep, fmt.Errorf("component %+v: %w", c, err) + } nodeID, err := resolveOrigin(c.OriginNode) if err != nil { return rep, err @@ -158,18 +171,13 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol rep.Applied++ } for _, f := range resp.Freshness { - if err := validateFreshness(f); err != nil { - return rep, fmt.Errorf("freshness %+v: %w", f, err) - } if _, ok := accepted[f.Destination]; !ok { - rep.Dropped++ - rep.Drops = append(rep.Drops, DurabilityDrop{ - Destination: f.Destination, - OriginNode: f.OriginNode, - Kind: "freshness", - }) + rep.recordDrop(DurabilityDrop{Destination: f.Destination, OriginNode: f.OriginNode, Kind: "freshness"}) continue } + if err := validateFreshness(f); err != nil { + return rep, fmt.Errorf("freshness %+v: %w", f, err) + } nodeID, err := resolveOrigin(f.OriginNode) if err != nil { return rep, err @@ -177,6 +185,7 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol if err := s.MergeDestinationPushFreshness(ctx, volumeID, f.Destination, nodeID, f.OriginRun); err != nil { return rep, fmt.Errorf("apply freshness for destination %q origin %q: %w", f.Destination, f.OriginNode, err) } + rep.Applied++ } return rep, nil } diff --git a/sync/node.go b/sync/node.go index 7005b7e..ba92d79 100644 --- a/sync/node.go +++ b/sync/node.go @@ -677,10 +677,11 @@ func (d *nodeSyncDriver) phaseClose() error { } // pullPeerDurability fetches the peer's destination vectors and merges -// them into the local store. Failures and refused rewinds surface as -// report warnings rather than failing the run: the sync itself -// succeeded, and the pull can be retried any time via the standalone -// `peer-sync pull-durability` command. +// them into the local store. Failures, refused rewinds, and components +// dropped for unconfigured destinations surface as report warnings +// rather than failing the run: the sync itself succeeded, and the pull +// can be retried any time via the standalone `peer-sync pull-durability` +// command. func (d *nodeSyncDriver) pullPeerDurability() { rep, err := pullDurability(d.ctx, d.store, d.client, d.vol.Name, d.volID, d.node.Name, acceptedDestinations(d.vol), false) d.report.DurabilityPull = rep @@ -693,12 +694,35 @@ func (d *nodeSyncDriver) pullPeerDurability() { d.report.Warnings = append(d.report.Warnings, fmt.Sprintf("durability pull from %s refused rewind: %s", d.node.Name, rw)) } - for _, dr := range rep.Drops { + if rep.Dropped > 0 { d.report.Warnings = append(d.report.Warnings, - fmt.Sprintf("durability pull from %s dropped %s", d.node.Name, dr)) + fmt.Sprintf("durability pull from %s dropped %d entr%s for unconfigured destinations (e.g. %s)", + d.node.Name, rep.Dropped, plural(rep.Dropped, "y", "ies"), dropSample(rep.Drops))) } } +func plural(n int, one, many string) string { + if n == 1 { + return one + } + return many +} + +// dropSample renders the sampled destinations from a drop list as a +// compact, deduplicated, comma-separated string for one summary line. +func dropSample(drops []DurabilityDrop) string { + seen := make(map[string]struct{}, len(drops)) + var names []string + for _, d := range drops { + if _, ok := seen[d.Destination]; ok { + continue + } + seen[d.Destination] = struct{}{} + names = append(names, d.Destination) + } + return strings.Join(names, ", ") +} + func (d *nodeSyncDriver) abortWithError(phase string, err error) error { d.report.Status = store.RunStatusFailed if d.receiverRunID != 0 {