From c83b7c91922c3a3e12598dec834415b5c73259c7 Mon Sep 17 00:00:00 2001 From: Martin Bertschler Date: Thu, 11 Jun 2026 05:44:51 +0200 Subject: [PATCH 1/3] index: pin row size/mtime to the hashed-handle stat The indexer captured size and mtime at directory-walk time, then hashed later from a fresh open with no re-stat. A file appended-to between the walk and the hash bound the new BLAKE3 digest to the stale walk size in an immutable contents row; every later observation of those true bytes then failed the size cross-check in lookupContentTx and aborted the whole ApplyIndexBatch repeatedly. hashFile now Stats the open handle after hashing and returns the size and mtime alongside the digest, so the row's metadata describes the same inode state as the bytes that produced the hash. Re-opening by path would reintroduce the race, so the live handle is used. Refs #107 --- index/index.go | 49 +++++++++++++++++++++++------- index/index_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 11 deletions(-) diff --git a/index/index.go b/index/index.go index b34552c..f21dee1 100644 --- a/index/index.go +++ b/index/index.go @@ -555,16 +555,16 @@ func (i *indexer) process(w workItem, buf []byte) resultItem { return resultItem{row: existing, kind: kindUnchanged} } - digest, err := hashFile(w.absPath, buf) + hashed, err := hashFile(w.absPath, buf) if err != nil { return resultItem{err: fmt.Errorf("hash %s: %w", w.absPath, err)} } - row := i.rowFor(w, digest) + row := i.rowFor(w, hashed) if !hasExisting { return resultItem{row: row, kind: kindAdded} } - if bytes.Equal(existing.Blake3, digest) && existing.Status == store.StatusPresent { + if bytes.Equal(existing.Blake3, hashed.digest) && existing.Status == store.StatusPresent { return resultItem{row: existing, kind: kindUnchanged} } return resultItem{row: row, kind: kindModified} @@ -579,13 +579,19 @@ func metadataMatches(existing store.FileRow, w workItem) bool { existing.MtimeNs == w.mtimeNs } -func (i *indexer) rowFor(w workItem, digest []byte) store.FileRow { +// rowFor builds the file row from the hashed-file result rather than the +// walk-time workItem: SizeBytes and MtimeNs come from a Stat of the open +// handle taken after hashing, so the digest and the metadata describe the +// same inode state. A file appended-to between the walk and the hash would +// otherwise bind the new digest to the stale walk size, minting a contents +// row whose size_bytes can never match the honest content again. +func (i *indexer) rowFor(w workItem, hashed hashedFile) store.FileRow { return store.FileRow{ VolumeID: i.volumeID, Path: w.relPath, - Blake3: digest, - SizeBytes: w.sizeBytes, - MtimeNs: w.mtimeNs, + Blake3: hashed.digest, + SizeBytes: hashed.sizeBytes, + MtimeNs: hashed.mtimeNs, Status: store.StatusPresent, FirstSeenRunID: i.runID, LastSeenRunID: i.runID, @@ -676,15 +682,36 @@ func resolveNamedVolume(ctx context.Context, s *store.Store, name, absRoot strin // the run; allocating per-file made GC pressure outweigh the syscall win. const hashReadBufferSize = 1 << 20 -func hashFile(path string, buf []byte) ([]byte, error) { +// hashedFile is the digest of a file's bytes paired with the size and +// mtime read from the same open handle immediately after hashing. +type hashedFile struct { + digest []byte + sizeBytes int64 + mtimeNs int64 +} + +// hashFile hashes the file at path and reads its size and mtime from the +// open handle after the hash completes. Stat-after-hash on the live handle +// (rather than re-opening by path, which would reintroduce a race) pins the +// metadata to the same bytes that produced the digest, even if the file was +// growing during the walk-to-hash window. +func hashFile(path string, buf []byte) (hashedFile, error) { f, err := os.Open(path) if err != nil { - return nil, err + return hashedFile{}, err } defer f.Close() h := blake3.New() if _, err := io.CopyBuffer(h, f, buf); err != nil { - return nil, err + return hashedFile{}, err + } + fi, err := f.Stat() + if err != nil { + return hashedFile{}, err } - return h.Sum(nil), nil + return hashedFile{ + digest: h.Sum(nil), + sizeBytes: fi.Size(), + mtimeNs: fi.ModTime().UnixNano(), + }, nil } diff --git a/index/index_test.go b/index/index_test.go index a3a5a0b..e409af5 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -12,10 +12,18 @@ import ( "testing" "time" + "github.com/zeebo/blake3" + "github.com/mbertschler/squirrel/store" "github.com/mbertschler/squirrel/volmark" ) +func blake3Of(t *testing.T, content string) []byte { + t.Helper() + sum := blake3.Sum256([]byte(content)) + return sum[:] +} + func setupStore(t *testing.T) *store.Store { t.Helper() dsn := filepath.Join(t.TempDir(), "test.db") @@ -1089,3 +1097,67 @@ func TestIndexFlipsOffloadedBackToPresent(t *testing.T) { after.FirstSeenRunID, before.FirstSeenRunID) } } + +// TestHashStatPinnedToHashedBytes simulates a file that grows between the +// walker's stat and the worker's hash: process must record the size read +// from the open handle after hashing (matching the hashed content), not +// the stale walk size. Binding the new digest to the old size would mint a +// contents row whose size_bytes can never match the honest content again. +func TestHashStatPinnedToHashedBytes(t *testing.T) { + root := t.TempDir() + abs := filepath.Join(root, "growing.txt") + content := "the full on-disk content after the append" + writeFile(t, abs, content) + + s := setupStore(t) + ctx := context.Background() + idx, err := newIndexer(ctx, s, root, Options{Name: "vol"}) + if err != nil { + t.Fatalf("newIndexer: %v", err) + } + + stale := workItem{ + absPath: abs, + relPath: "growing.txt", + sizeBytes: 3, // what the walker stat saw before the append + mtimeNs: 1, + } + res := idx.process(stale, make([]byte, hashReadBufferSize)) + if res.err != nil { + t.Fatalf("process: %v", res.err) + } + if res.row.SizeBytes != int64(len(content)) { + t.Fatalf("row size = %d, want %d (the hashed bytes, not the stale walk size %d)", + res.row.SizeBytes, len(content), stale.sizeBytes) + } + want := blake3Of(t, content) + if !bytes.Equal(res.row.Blake3, want) { + t.Fatalf("row digest = %x, want %x", res.row.Blake3, want) + } +} + +// TestReindexStableBytesNoSizeMismatch indexes a file, then re-indexes the +// same untouched bytes. The contents row minted on the first pass carries +// the size of the bytes that were hashed, so the second pass resolves the +// same (digest, size) pair and ApplyIndexBatch does not hit the immutable +// size cross-check that aborts the whole batch. +func TestReindexStableBytesNoSizeMismatch(t *testing.T) { + root := t.TempDir() + writeFile(t, filepath.Join(root, "stable.txt"), "stable content that never changes") + + s := setupStore(t) + ctx := context.Background() + if _, err := Index(ctx, s, root, Options{Name: "vol"}); err != nil { + t.Fatalf("first Index: %v", err) + } + rep, err := Index(ctx, s, root, Options{Name: "vol"}) + if err != nil { + t.Fatalf("second Index aborted (size cross-check?): %v", err) + } + if rep.Errors != 0 { + t.Fatalf("re-index reported errors: %+v", rep.ErrorList) + } + if rep.Unchanged != 1 { + t.Fatalf("re-index unchanged = %d, want 1", rep.Unchanged) + } +} From 3bc891b4edeff13fbd1f06c56b479c4b00f57afa Mon Sep 17 00:00:00 2001 From: Martin Bertschler Date: Thu, 11 Jun 2026 05:45:01 +0200 Subject: [PATCH 2/3] sync: verify content-addressed uploads by re-hashing the source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit uploadOneObject stat-guarded the source on size+mtime before rclone read it (a stat->copy race) and confirmed only size after upload, so a size+mtime-preserving in-place edit could upload bytes that did not match the recorded hash; the recorded remote_objects row then suppressed any future re-upload of the honest content forever. The pre-transfer guard now re-hashes the source with BLAKE3 immediately before copyto and refuses (errContentDrift) when the digest no longer matches the indexed hash. A refusal is surfaced as a warning and fails the run without recording an object or writing the manifest segment, so the watermark holds and the next run re-offers the object — the honest bytes land once restored, and the drifted bytes never bind to the hash. Residual: rclone reads the file in a child process after the re-hash, so an edit in that fork/exec window could still upload drifted bytes; the window is one rclone invocation rather than the whole walk-to-push span, and the scan-back fingerprint pass re-reads the landed object before it counts as content-verified. Refs #107 --- sync/content_addressed.go | 75 +++++++++++++++++++++++++++----- sync/content_addressed_test.go | 79 ++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 11 deletions(-) diff --git a/sync/content_addressed.go b/sync/content_addressed.go index c4abdd6..b20576c 100644 --- a/sync/content_addressed.go +++ b/sync/content_addressed.go @@ -1,16 +1,21 @@ package sync import ( + "bytes" "context" "encoding/hex" "encoding/json" + "errors" "fmt" + "io" "os" "path" "path/filepath" "slices" "strconv" + "github.com/zeebo/blake3" + "github.com/mbertschler/squirrel/config" "github.com/mbertschler/squirrel/store" ) @@ -209,8 +214,16 @@ func (h *contentAddressedHandler) watermark(ctx context.Context, volID int64) (i // failures don't stop the loop — every object that lands now is recorded // and saves work on the retry — but any failure fails the run before // the segment is written. +// +// A source whose bytes drifted from the indexed hash is refused without a +// remote_objects row (errContentDrift): it is surfaced as a warning and +// fails the run, so the segment is not written and the watermark does not +// advance. The next run recomputes the same delta and re-offers the +// object, letting the honest bytes land once they are restored — without +// the drifted bytes ever being recorded under the hash. func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report, runID int64, delta []store.PathDelta) error { var confirmed []store.PathDelta + var drifted int for _, d := range plannedUploads(delta) { recorded, err := h.store.HasRemoteObject(ctx, d.ContentID, h.dest.Name) if err != nil { @@ -221,6 +234,11 @@ func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report continue } if err := h.uploadOneObject(ctx, runID, d); err != nil { + if errors.Is(err, errContentDrift) { + drifted++ + rep.Warnings = append(rep.Warnings, err.Error()) + continue + } rep.RcloneResult.Errors++ if int64(len(rep.RcloneResult.FailedFiles)) < maxFailedFiles { rep.RcloneResult.FailedFiles = append(rep.RcloneResult.FailedFiles, @@ -236,6 +254,9 @@ func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report if rep.RcloneResult.Errors > 0 { return fmt.Errorf("%d object(s) failed to land on %q; the manifest segment for run %d was not written and the durability vector did not advance", rep.RcloneResult.Errors, h.dest.Name, runID) } + if drifted > 0 { + return fmt.Errorf("%d object(s) on %q were refused for drifting from their indexed hash; re-index the volume and sync again — the manifest segment for run %d was not written and the durability vector did not advance", drifted, h.dest.Name, runID) + } return nil } @@ -307,22 +328,39 @@ func plannedUploads(delta []store.PathDelta) []store.PathDelta { return out } -// uploadOneObject lands one content object and records the upload. The -// pre-transfer stat guards the content-addressed invariant — the bytes -// stored under a hash must be the bytes that produced it — by refusing -// a source file whose size or mtime drifted from the indexed row; the -// post-transfer stat confirms presence and size on the remote. The +// errContentDrift marks a source file whose bytes no longer match the +// content hash the index bound them to. The upload path raises it instead +// of recording an object; uploadObjects turns it into a warning and fails +// the run so the watermark holds and the object is re-offered next run. +var errContentDrift = errors.New("source content drifted from its indexed hash") + +// uploadOneObject lands one content object and records the upload. It +// guards the content-addressed invariant — the bytes stored under a hash +// must be the bytes that produced it — by re-hashing the source file +// immediately before the transfer and refusing (errContentDrift) when the +// digest no longer matches the indexed hash; a size+mtime-preserving +// in-place edit, which the metadata stat alone cannot catch, fails here. +// The post-transfer stat confirms presence and size on the remote, and the // upload record is written only after that confirmation, so a recorded -// hash is always a confirmed one; a crash in between re-uploads the -// same bytes idempotently on the next run. +// hash is always a confirmed one; a crash in between re-uploads the same +// bytes idempotently on the next run. +// +// Residual: rclone reads the file in a separate child process after the +// re-hash, so a writer that edits the file in the window between the hash +// and rclone's read could still upload drifted bytes. The window is the +// fork/exec of one rclone invocation rather than the whole walk-to-push +// span, and the scan-back fingerprint pass (#109) re-reads the landed +// object to upgrade the durability vector, catching any byte that slipped +// through before the object is treated as content-verified. func (h *contentAddressedHandler) uploadOneObject(ctx context.Context, runID int64, d store.PathDelta) error { src := filepath.Join(h.vol.Path, filepath.FromSlash(d.Path)) - fi, err := os.Stat(src) + digest, err := hashLocalFile(src) if err != nil { - return fmt.Errorf("stat %s: %w", src, err) + return fmt.Errorf("re-hash %s before upload: %w", src, err) } - if fi.Size() != d.SizeBytes || fi.ModTime().UnixNano() != d.MtimeNs { - return fmt.Errorf("%s changed on disk since it was indexed (size %d→%d, mtime %d→%d) — run `squirrel index %s` and sync again", d.Path, d.SizeBytes, fi.Size(), d.MtimeNs, fi.ModTime().UnixNano(), h.vol.Name) + if !bytes.Equal(digest, d.Blake3) { + return fmt.Errorf("%w: %s now hashes to %s, indexed as %s — run `squirrel index %s` and sync again", + errContentDrift, d.Path, hex.EncodeToString(digest), hex.EncodeToString(d.Blake3), h.vol.Name) } hash := hex.EncodeToString(d.Blake3) uri := h.objectURI(hash) @@ -346,6 +384,21 @@ func (h *contentAddressedHandler) uploadOneObject(ctx context.Context, runID int return nil } +// hashLocalFile streams the file at path through BLAKE3 and returns the +// raw 32-byte digest, the same hash the indexer binds content under. +func hashLocalFile(path string) ([]byte, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + h := blake3.New() + if _, err := io.Copy(h, f); err != nil { + return nil, err + } + return h.Sum(nil), nil +} + // uploadSegment writes the run's manifest segment and confirms it // landed at the expected size. Every run uploads one — an unchanged // volume yields an empty segment — so each successful run leaves the diff --git a/sync/content_addressed_test.go b/sync/content_addressed_test.go index d9b50a8..1d7dfae 100644 --- a/sync/content_addressed_test.go +++ b/sync/content_addressed_test.go @@ -10,6 +10,7 @@ import ( "runtime" "strings" "testing" + "time" "github.com/zeebo/blake3" @@ -679,6 +680,84 @@ func TestContentAddressedCrossVolumeDedup(t *testing.T) { } } +// TestContentAddressedDriftRefusesObject: a source file whose on-disk +// bytes drift from the indexed hash (here a same-length, mtime-preserving +// in-place edit the metadata stat alone cannot catch) is never recorded in +// remote_objects. The run is refused and warned, the watermark holds, and +// once the honest bytes are restored the next run lands and records the +// object normally — the drifted bytes never bound to the hash. +func TestContentAddressedDriftRefusesObject(t *testing.T) { + f := setupContentAddressedFixture(t) + f.write(t, "a.txt", "alpha") + f.index(t) + + indexedMtime := f.mtimeNs(t, "a.txt") + src := filepath.Join(f.pair.Volume.Path, "a.txt") + if err := os.WriteFile(src, []byte("ALPHA"), 0o644); err != nil { + t.Fatalf("in-place edit: %v", err) + } + restoreMtime := time.Unix(0, indexedMtime) + if err := os.Chtimes(src, restoreMtime, restoreMtime); err != nil { + t.Fatalf("restore mtime: %v", err) + } + + rep, err := f.sync(t) + if err == nil || !strings.Contains(err.Error(), "drifting from their indexed hash") { + t.Fatalf("expected a drift refusal, got %v", err) + } + if rep.Status != store.RunStatusFailed { + t.Fatalf("Status = %q, want failed", rep.Status) + } + if rep.RcloneResult.Transferred != 0 || rep.RcloneResult.Errors != 0 { + t.Fatalf("counts = transferred=%d errors=%d, want 0/0 (drift is a refusal, not a transfer error)", + rep.RcloneResult.Transferred, rep.RcloneResult.Errors) + } + if len(rep.Warnings) == 0 || !strings.Contains(strings.Join(rep.Warnings, "\n"), "drifted") { + t.Fatalf("Warnings = %+v, want a drift advisory", rep.Warnings) + } + if _, statErr := os.Stat(f.remotePath(ObjectsDirName, blake3Hex("alpha"))); statErr == nil { + t.Fatalf("drifted source uploaded an object under the indexed hash") + } + + row, err := f.store.GetByPath(context.Background(), f.volumeID(t), "a.txt") + if err != nil { + t.Fatalf("GetByPath: %v", err) + } + if has, _ := f.store.HasRemoteObject(context.Background(), row.ContentID, "offsite"); has { + t.Fatalf("drifted object was recorded in remote_objects") + } + if _, statErr := os.Stat(f.remotePath("pics", ManifestDirName, fmt.Sprintf("run-%d", rep.RunID))); statErr == nil { + t.Fatalf("manifest segment written despite a refused object") + } + if vector, err := f.store.ListDestinationRunIDs(context.Background(), f.volumeID(t), "offsite"); err != nil || len(vector) != 0 { + t.Fatalf("vector = %+v (err=%v), want empty after a refused object", vector, err) + } + + if err := os.WriteFile(src, []byte("alpha"), 0o644); err != nil { + t.Fatalf("restore honest bytes: %v", err) + } + if err := os.Chtimes(src, restoreMtime, restoreMtime); err != nil { + t.Fatalf("restore mtime again: %v", err) + } + rep2, err := f.sync(t) + if err != nil { + t.Fatalf("retry sync: %v", err) + } + if rep2.RcloneResult.Transferred != 1 { + t.Fatalf("retry transferred = %d, want 1 (honest bytes upload after drift cleared)", rep2.RcloneResult.Transferred) + } + if has, _ := f.store.HasRemoteObject(context.Background(), row.ContentID, "offsite"); !has { + t.Fatalf("honest re-upload was not recorded") + } + got, err := os.ReadFile(f.remotePath(ObjectsDirName, blake3Hex("alpha"))) + if err != nil { + t.Fatalf("object missing after honest re-upload: %v", err) + } + if string(got) != "alpha" { + t.Fatalf("recorded object = %q, want the honest bytes %q", got, "alpha") + } +} + // TestContentAddressedRefusesVolumeNamedObjects: a volume named like // the destination-root objects/ directory would collide with it. func TestContentAddressedRefusesVolumeNamedObjects(t *testing.T) { From 17de6d6f17139464ed2190be6d4b42c3813a6bfa Mon Sep 17 00:00:00 2001 From: Martin Bertschler Date: Thu, 11 Jun 2026 05:53:01 +0200 Subject: [PATCH 3/3] index,sync: note the residual hash-EOF/stat window; trim negation phrasings --- index/index.go | 10 +++++++--- sync/content_addressed.go | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/index/index.go b/index/index.go index f21dee1..d945cdc 100644 --- a/index/index.go +++ b/index/index.go @@ -582,9 +582,8 @@ func metadataMatches(existing store.FileRow, w workItem) bool { // rowFor builds the file row from the hashed-file result rather than the // walk-time workItem: SizeBytes and MtimeNs come from a Stat of the open // handle taken after hashing, so the digest and the metadata describe the -// same inode state. A file appended-to between the walk and the hash would -// otherwise bind the new digest to the stale walk size, minting a contents -// row whose size_bytes can never match the honest content again. +// same inode state — keeping the minted contents row internally +// consistent against the immutable-contents size cross-check. func (i *indexer) rowFor(w workItem, hashed hashedFile) store.FileRow { return store.FileRow{ VolumeID: i.volumeID, @@ -695,6 +694,11 @@ type hashedFile struct { // (rather than re-opening by path, which would reintroduce a race) pins the // metadata to the same bytes that produced the digest, even if the file was // growing during the walk-to-hash window. +// +// Residual: an append landing between the hash reaching EOF and the Stat +// can still report a size above the bytes hashed; the window is a single +// syscall gap (vs. the whole walk-to-hash span this closes), and a later +// re-index of the settled file supersedes to a consistent row. func hashFile(path string, buf []byte) (hashedFile, error) { f, err := os.Open(path) if err != nil { diff --git a/sync/content_addressed.go b/sync/content_addressed.go index b20576c..80911c3 100644 --- a/sync/content_addressed.go +++ b/sync/content_addressed.go @@ -338,8 +338,8 @@ var errContentDrift = errors.New("source content drifted from its indexed hash") // guards the content-addressed invariant — the bytes stored under a hash // must be the bytes that produced it — by re-hashing the source file // immediately before the transfer and refusing (errContentDrift) when the -// digest no longer matches the indexed hash; a size+mtime-preserving -// in-place edit, which the metadata stat alone cannot catch, fails here. +// digest no longer matches the indexed hash, catching a +// size+mtime-preserving in-place edit that a metadata stat would pass. // The post-transfer stat confirms presence and size on the remote, and the // upload record is written only after that confirmation, so a recorded // hash is always a confirmed one; a crash in between re-uploads the same