Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/squirrel/peer_sync_pull_durability.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,15 @@ func runPeerSyncPullDurability(cmd *cobra.Command, volumeName, peerName string,
}

func printDurabilityPull(w io.Writer, rep sync.DurabilityPullReport) {
fmt.Fprintf(w, "%s ← %s fetched=%d applied=%d\n",
rep.Volume, rep.Peer, rep.Fetched, rep.Applied)
fmt.Fprintf(w, "%s ← %s fetched=%d applied=%d dropped=%d\n",
rep.Volume, rep.Peer, rep.Fetched, rep.Applied, rep.Dropped)
for _, rw := range rep.Rewinds {
fmt.Fprintf(w, " refused rewind: %s\n", rw)
}
for _, dr := range rep.Drops {
fmt.Fprintf(w, " dropped %s\n", dr)
}
Comment on lines +70 to +72

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 244369b. printDurabilityPull prints at most the capped sample then a '… N more dropped' tail, so stdout stays bounded under an adversarial/buggy peer while the exact dropped count is still shown on the summary line.

if more := rep.Dropped - len(rep.Drops); more > 0 {
fmt.Fprintf(w, " … and %d more dropped\n", more)
}
}
1 change: 1 addition & 0 deletions cmd/squirrel/peer_sync_pull_durability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func newPullDurabilityFixture(t *testing.T) pullDurabilityFixture {

[volumes.pics]
path = %q
offload_requires = ["offsite-a"]

[nodes.nas]
endpoint = %q
Expand Down
7 changes: 6 additions & 1 deletion cmd/squirrel/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,13 @@ func printSyncReport(w io.Writer, rep sync.Report, runErr error) {
fmt.Fprintf(w, " mismatched %s: expected %s, actual %s\n", m.Path, m.ExpectedHex, m.ActualHex)
}
if rep.DurabilityPull.Fetched > 0 {
fmt.Fprintf(w, " durability: applied %d/%d peer components\n",
fmt.Fprintf(w, " durability: applied %d/%d peer entries",
rep.DurabilityPull.Applied, rep.DurabilityPull.Fetched)
if rep.DurabilityPull.Dropped > 0 {
fmt.Fprintf(w, " (dropped %d for unconfigured destinations)",
rep.DurabilityPull.Dropped)
}
fmt.Fprintln(w)
}
}
for _, c := range rep.NodeConflicts {
Expand Down
84 changes: 77 additions & 7 deletions sync/durability.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,55 @@ import (
"github.com/mbertschler/squirrel/syncproto"
)

// maxDurabilityDropSamples caps how many dropped entries the report
// retains as detail. The Dropped count stays exact; only the sampled
// Drops slice is bounded, so an adversarial peer flooding out-of-scope
// destinations cannot blow up the report or the output that renders it.
const maxDurabilityDropSamples = 16

// DurabilityPullReport summarises one durability metadata pull from a
// peer: how many vector components were fetched, how many landed in
// the local destination_run_ids (advanced or re-confirmed), and which
// were refused as rewinds. Every fetched component lands in exactly
// one of the two buckets.
// peer: how many entries (vector components and freshness coordinates)
// were fetched, how many components landed in the local
// destination_run_ids (advanced or re-confirmed), how many were refused
// as rewinds, and how many entries were dropped because the peer named a
// destination outside this volume's accepted target set. Every fetched
// entry lands in exactly one of the applied / rewind / dropped buckets
// (a merged freshness coordinate counts as applied).
type DurabilityPullReport struct {
Volume string
Peer string
Fetched int
Applied int
Dropped int
Rewinds []DurabilityRewind
// Drops samples the dropped entries up to maxDurabilityDropSamples;
// Dropped is the exact total.
Drops []DurabilityDrop
}

// recordDrop counts a dropped entry and samples it into Drops up to the
// cap, so the exact total survives while the detail stays bounded.
func (r *DurabilityPullReport) recordDrop(d DurabilityDrop) {
r.Dropped++
if len(r.Drops) < maxDurabilityDropSamples {
r.Drops = append(r.Drops, d)
}
}

// DurabilityDrop is one pulled entry the merge discarded because its
// destination falls outside the volume's accepted target set
// (offload_requires ∪ sync_to). Drops are counted and sampled so a peer
// asserting evidence for destinations this node uses for neither offload
// nor sync stays observable.
type DurabilityDrop struct {
Destination string
OriginNode string
Kind string // "component" or "freshness"
}

func (d DurabilityDrop) String() string {
return fmt.Sprintf("%s for unconfigured destination %s origin %s",
d.Kind, d.Destination, d.OriginNode)
}

// DurabilityRewind is one component the pull refused because the peer
Expand All @@ -47,6 +85,12 @@ func (r DurabilityRewind) String() string {
// watermark store, refused rewinds are reported on the result (not
// applied), and allowRewind is the explicit recovery override.
//
// The merge is scoped to destinations this volume actually references
// (offload_requires ∪ sync_to); evidence for any other destination is
// dropped, so a buggy or compromised peer cannot pollute the local
// vector with rows for destinations this node neither requires for
// offload nor syncs to.
//
// The standalone `peer-sync pull-durability` command and the automatic
// post-close pull share this implementation.
func PullDurability(ctx context.Context, s *store.Store, vol *config.Volume, node *config.Node, allowRewind bool) (DurabilityPullReport, error) {
Expand All @@ -57,18 +101,35 @@ func PullDurability(ctx context.Context, s *store.Store, vol *config.Volume, nod
}
return DurabilityPullReport{}, fmt.Errorf("lookup volume %q: %w", vol.Name, err)
}
return pullDurability(ctx, s, newNodeClient(node), vol.Name, v.ID, node.Name, allowRewind)
return pullDurability(ctx, s, newNodeClient(node), vol.Name, v.ID, node.Name, acceptedDestinations(vol), allowRewind)
}

// acceptedDestinations is the set of destination names this volume
// references: the union of its offload_requires and sync_to entries.
// A pulled durability entry for any name outside this set has no bearing
// on the volume's local decisions and is dropped by the pull.
func acceptedDestinations(vol *config.Volume) map[string]struct{} {
accepted := make(map[string]struct{}, len(vol.OffloadRequires)+len(vol.SyncTo))
for _, name := range vol.OffloadRequires {
accepted[name] = struct{}{}
}
for _, name := range vol.SyncTo {
accepted[name] = struct{}{}
}
return accepted
}

// pullDurability is the transport-injected body of PullDurability,
// shared with the node-sync driver (which already holds a client).
func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, volumeName string, volumeID int64, peerName string, allowRewind bool) (DurabilityPullReport, error) {
// accepted scopes which destinations the merge will store (see
// acceptedDestinations).
func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, volumeName string, volumeID int64, peerName string, accepted map[string]struct{}, allowRewind bool) (DurabilityPullReport, error) {
rep := DurabilityPullReport{Volume: volumeName, Peer: peerName}
resp, err := client.durability(ctx, syncproto.DurabilityRequest{Volume: volumeName})
if err != nil {
return rep, err
}
rep.Fetched = len(resp.Components)
rep.Fetched = len(resp.Components) + len(resp.Freshness)
originIDs := make(map[string]int64, 4)
resolveOrigin := func(name string) (int64, error) {
if id, ok := originIDs[name]; ok {
Expand All @@ -82,6 +143,10 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol
return node.ID, nil
}
for _, c := range resp.Components {
if _, ok := accepted[c.Destination]; !ok {
rep.recordDrop(DurabilityDrop{Destination: c.Destination, OriginNode: c.OriginNode, Kind: "component"})
continue
}
if err := validateComponent(c); err != nil {
return rep, fmt.Errorf("component %+v: %w", c, err)
}
Expand All @@ -106,6 +171,10 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol
rep.Applied++
}
for _, f := range resp.Freshness {
if _, ok := accepted[f.Destination]; !ok {
rep.recordDrop(DurabilityDrop{Destination: f.Destination, OriginNode: f.OriginNode, Kind: "freshness"})
continue
}
if err := validateFreshness(f); err != nil {
return rep, fmt.Errorf("freshness %+v: %w", f, err)
}
Expand All @@ -116,6 +185,7 @@ func pullDurability(ctx context.Context, s *store.Store, client *nodeClient, vol
if err := s.MergeDestinationPushFreshness(ctx, volumeID, f.Destination, nodeID, f.OriginRun); err != nil {
return rep, fmt.Errorf("apply freshness for destination %q origin %q: %w", f.Destination, f.OriginNode, err)
}
rep.Applied++
}
return rep, nil
}
Expand Down
116 changes: 116 additions & 0 deletions sync/durability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"context"
"fmt"
"strings"
"testing"
)
Expand Down Expand Up @@ -58,6 +59,8 @@ func seedReceiverFreshness(t *testing.T, f *nodeFixture, coords map[string]int64
func TestPullDurabilityMergesFreshness(t *testing.T) {
f := setupNodeFixtureNoRclone(t)
ctx := context.Background()
f.initVol.OffloadRequires = []string{"offsite-a"}
f.initVol.SyncTo = []string{"offsite-b"}
originName := seedReceiverFreshness(t, f, map[string]int64{
"offsite-a": 12,
"offsite-b": 5,
Expand Down Expand Up @@ -98,6 +101,8 @@ func TestPullDurabilityMergesFreshness(t *testing.T) {
func TestPullDurabilityCopiesComponents(t *testing.T) {
f := setupNodeFixtureNoRclone(t)
ctx := context.Background()
f.initVol.OffloadRequires = []string{"offsite-a"}
f.initVol.SyncTo = []string{"offsite-b"}
originName := seedReceiverDurability(t, f, map[string]int64{
"offsite-a": 12,
"offsite-b": 5,
Expand Down Expand Up @@ -130,12 +135,123 @@ func TestPullDurabilityCopiesComponents(t *testing.T) {
}
}

// TestPullDurabilityDropsUnconfiguredDestinations: the pull merges
// components for destinations the volume references (one via
// offload_requires, one via sync_to) and drops one for an unconfigured
// destination — counted and reported, never stored, and without
// aborting the merge of the legitimate components.
func TestPullDurabilityDropsUnconfiguredDestinations(t *testing.T) {
f := setupNodeFixtureNoRclone(t)
ctx := context.Background()
f.initVol.OffloadRequires = []string{"offload-target"}
f.initVol.SyncTo = []string{"sync-target"}
originName := seedReceiverDurability(t, f, map[string]int64{
"offload-target": 12,
"sync-target": 5,
"junk": 99,
})

v, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path)
if err != nil {
t.Fatalf("CreateVolume on initiator: %v", err)
}
rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false)
if err != nil {
t.Fatalf("PullDurability: %v", err)
}
if rep.Fetched != 3 || rep.Applied != 2 || rep.Dropped != 1 || len(rep.Rewinds) != 0 {
t.Fatalf("report = %+v, want fetched=3 applied=2 dropped=1 no rewinds", rep)
}
if len(rep.Drops) != 1 || rep.Drops[0].Destination != "junk" || rep.Drops[0].Kind != "component" {
t.Fatalf("drops = %+v, want one component drop for junk", rep.Drops)
}

origin, err := f.initStore.GetNodeByName(ctx, originName)
if err != nil {
t.Fatalf("origin node %q was not created locally: %v", originName, err)
}
for dest, want := range map[string]int64{"offload-target": 12, "sync-target": 5} {
got, err := f.initStore.GetDestinationRunID(ctx, v.ID, dest, origin.ID)
if err != nil {
t.Fatalf("GetDestinationRunID %s: %v", dest, err)
}
if got.OriginRunID != want {
t.Fatalf("%s component = %d, want %d", dest, got.OriginRunID, want)
}
}
if _, err := f.initStore.GetDestinationRunID(ctx, v.ID, "junk", origin.ID); err == nil {
t.Fatal("junk component was stored, want it dropped")
}
}

// TestPullDurabilityDropsUnconfiguredFreshness: a freshness coordinate
// for a destination outside the volume's accepted set is dropped and
// counted (Kind "freshness") just like a stray vector component, so a
// peer can't seed push-freshness for a destination this node never uses.
func TestPullDurabilityDropsUnconfiguredFreshness(t *testing.T) {
f := setupNodeFixtureNoRclone(t)
ctx := context.Background()
f.initVol.OffloadRequires = []string{"offsite-a"}
f.initVol.SyncTo = nil
seedReceiverFreshness(t, f, map[string]int64{
"offsite-a": 12,
"junk": 99,
})

if _, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path); err != nil {
t.Fatalf("CreateVolume on initiator: %v", err)
}
rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false)
if err != nil {
t.Fatalf("PullDurability: %v", err)
}
if rep.Dropped != 1 || len(rep.Drops) != 1 {
t.Fatalf("report = %+v, want exactly one drop", rep)
}
if rep.Drops[0].Destination != "junk" || rep.Drops[0].Kind != "freshness" {
t.Fatalf("drop = %+v, want a freshness drop for junk", rep.Drops[0])
}
if rep.Fetched < 1 || rep.Applied < 1 {
t.Fatalf("report = %+v, want the accepted offsite-a freshness still applied and counted", rep)
}
}

// TestPullDurabilityCapsDropSamples: a peer flooding many out-of-scope
// destinations keeps the exact Dropped count but bounds the sampled
// Drops slice, so neither the report nor the output it feeds can grow
// unbounded under an adversarial peer.
func TestPullDurabilityCapsDropSamples(t *testing.T) {
f := setupNodeFixtureNoRclone(t)
ctx := context.Background()
junk := make(map[string]int64, 50)
for i := range 50 {
junk[fmt.Sprintf("junk-%02d", i)] = 1
}
seedReceiverDurability(t, f, junk)

if _, err := f.initStore.CreateVolume(ctx, f.initVol.Name, f.initVol.Path); err != nil {
t.Fatalf("CreateVolume on initiator: %v", err)
}
rep, err := PullDurability(ctx, f.initStore, f.initVol, f.node, false)
if err != nil {
t.Fatalf("PullDurability: %v", err)
}
if rep.Fetched != 50 || rep.Applied != 0 || rep.Dropped != 50 {
t.Fatalf("report = fetched=%d applied=%d dropped=%d, want 50/0/50", rep.Fetched, rep.Applied, rep.Dropped)
}
if len(rep.Drops) > 16 {
t.Fatalf("len(Drops) = %d, want capped at 16", len(rep.Drops))
}
}

// TestPullDurabilityRefusesRewind: a peer component below the locally
// recorded value is refused and reported, leaving the local value in
// place; the allow-rewind opt-in accepts it.
func TestPullDurabilityRefusesRewind(t *testing.T) {
f := setupNodeFixtureNoRclone(t)
ctx := context.Background()
f.initVol.OffloadRequires = []string{"offsite-a"}
f.initVol.SyncTo = []string{"offsite-b"}
originName := seedReceiverDurability(t, f, map[string]int64{
"offsite-a": 12,
"offsite-b": 5,
Expand Down
38 changes: 33 additions & 5 deletions sync/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,12 +677,13 @@ func (d *nodeSyncDriver) phaseClose() error {
}

// pullPeerDurability fetches the peer's destination vectors and merges
// them into the local store. Failures and refused rewinds surface as
// report warnings rather than failing the run: the sync itself
// succeeded, and the pull can be retried any time via the standalone
// `peer-sync pull-durability` command.
// them into the local store. Failures, refused rewinds, and components
// dropped for unconfigured destinations surface as report warnings
// rather than failing the run: the sync itself succeeded, and the pull
// can be retried any time via the standalone `peer-sync pull-durability`
// command.
func (d *nodeSyncDriver) pullPeerDurability() {
rep, err := pullDurability(d.ctx, d.store, d.client, d.vol.Name, d.volID, d.node.Name, false)
rep, err := pullDurability(d.ctx, d.store, d.client, d.vol.Name, d.volID, d.node.Name, acceptedDestinations(d.vol), false)
d.report.DurabilityPull = rep
if err != nil {
d.report.Warnings = append(d.report.Warnings,
Expand All @@ -693,6 +694,33 @@ func (d *nodeSyncDriver) pullPeerDurability() {
d.report.Warnings = append(d.report.Warnings,
fmt.Sprintf("durability pull from %s refused rewind: %s", d.node.Name, rw))
}
if rep.Dropped > 0 {
d.report.Warnings = append(d.report.Warnings,
fmt.Sprintf("durability pull from %s dropped %d entr%s for unconfigured destinations (e.g. %s)",
d.node.Name, rep.Dropped, plural(rep.Dropped, "y", "ies"), dropSample(rep.Drops)))
}
}

func plural(n int, one, many string) string {
if n == 1 {
return one
}
return many
}

// dropSample renders the sampled destinations from a drop list as a
// compact, deduplicated, comma-separated string for one summary line.
func dropSample(drops []DurabilityDrop) string {
seen := make(map[string]struct{}, len(drops))
var names []string
for _, d := range drops {
if _, ok := seen[d.Destination]; ok {
continue
}
seen[d.Destination] = struct{}{}
names = append(names, d.Destination)
}
return strings.Join(names, ", ")
}

func (d *nodeSyncDriver) abortWithError(phase string, err error) error {
Expand Down
4 changes: 3 additions & 1 deletion sync/node_origin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ func TestNodeSyncAdvancesVectorAndPullsDurabilityAtClose(t *testing.T) {
f := setupNodeFixture(t)
ctx := context.Background()

// The peer knows about a destination only it can see.
// The peer knows about a destination only it can see; the volume
// requires it for offload, so the pull accepts the peer's evidence.
f.initVol.OffloadRequires = []string{"offsite-x"}
recvSelfName := seedReceiverDurability(t, f, map[string]int64{"offsite-x": 7})

// Initiator: one forwarded-origin file seeded before indexing (a
Expand Down
Loading