diff --git a/index/index.go b/index/index.go index b34552c..d945cdc 100644 --- a/index/index.go +++ b/index/index.go @@ -555,16 +555,16 @@ func (i *indexer) process(w workItem, buf []byte) resultItem { return resultItem{row: existing, kind: kindUnchanged} } - digest, err := hashFile(w.absPath, buf) + hashed, err := hashFile(w.absPath, buf) if err != nil { return resultItem{err: fmt.Errorf("hash %s: %w", w.absPath, err)} } - row := i.rowFor(w, digest) + row := i.rowFor(w, hashed) if !hasExisting { return resultItem{row: row, kind: kindAdded} } - if bytes.Equal(existing.Blake3, digest) && existing.Status == store.StatusPresent { + if bytes.Equal(existing.Blake3, hashed.digest) && existing.Status == store.StatusPresent { return resultItem{row: existing, kind: kindUnchanged} } return resultItem{row: row, kind: kindModified} @@ -579,13 +579,18 @@ func metadataMatches(existing store.FileRow, w workItem) bool { existing.MtimeNs == w.mtimeNs } -func (i *indexer) rowFor(w workItem, digest []byte) store.FileRow { +// rowFor builds the file row from the hashed-file result rather than the +// walk-time workItem: SizeBytes and MtimeNs come from a Stat of the open +// handle taken after hashing, so the digest and the metadata describe the +// same inode state — keeping the minted contents row internally +// consistent against the immutable-contents size cross-check. +func (i *indexer) rowFor(w workItem, hashed hashedFile) store.FileRow { return store.FileRow{ VolumeID: i.volumeID, Path: w.relPath, - Blake3: digest, - SizeBytes: w.sizeBytes, - MtimeNs: w.mtimeNs, + Blake3: hashed.digest, + SizeBytes: hashed.sizeBytes, + MtimeNs: hashed.mtimeNs, Status: store.StatusPresent, FirstSeenRunID: i.runID, LastSeenRunID: i.runID, @@ -676,15 +681,41 @@ func resolveNamedVolume(ctx context.Context, s *store.Store, name, absRoot strin // the run; allocating per-file made GC pressure outweigh the syscall win. const hashReadBufferSize = 1 << 20 -func hashFile(path string, buf []byte) ([]byte, error) { +// hashedFile is the digest of a file's bytes paired with the size and +// mtime read from the same open handle immediately after hashing. +type hashedFile struct { + digest []byte + sizeBytes int64 + mtimeNs int64 +} + +// hashFile hashes the file at path and reads its size and mtime from the +// open handle after the hash completes. Stat-after-hash on the live handle +// (rather than re-opening by path, which would reintroduce a race) pins the +// metadata to the same bytes that produced the digest, even if the file was +// growing during the walk-to-hash window. +// +// Residual: an append landing between the hash reaching EOF and the Stat +// can still report a size above the bytes hashed; the window is a single +// syscall gap (vs. the whole walk-to-hash span this closes), and a later +// re-index of the settled file supersedes to a consistent row. +func hashFile(path string, buf []byte) (hashedFile, error) { f, err := os.Open(path) if err != nil { - return nil, err + return hashedFile{}, err } defer f.Close() h := blake3.New() if _, err := io.CopyBuffer(h, f, buf); err != nil { - return nil, err + return hashedFile{}, err + } + fi, err := f.Stat() + if err != nil { + return hashedFile{}, err } - return h.Sum(nil), nil + return hashedFile{ + digest: h.Sum(nil), + sizeBytes: fi.Size(), + mtimeNs: fi.ModTime().UnixNano(), + }, nil } diff --git a/index/index_test.go b/index/index_test.go index a3a5a0b..e409af5 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -12,10 +12,18 @@ import ( "testing" "time" + "github.com/zeebo/blake3" + "github.com/mbertschler/squirrel/store" "github.com/mbertschler/squirrel/volmark" ) +func blake3Of(t *testing.T, content string) []byte { + t.Helper() + sum := blake3.Sum256([]byte(content)) + return sum[:] +} + func setupStore(t *testing.T) *store.Store { t.Helper() dsn := filepath.Join(t.TempDir(), "test.db") @@ -1089,3 +1097,67 @@ func TestIndexFlipsOffloadedBackToPresent(t *testing.T) { after.FirstSeenRunID, before.FirstSeenRunID) } } + +// TestHashStatPinnedToHashedBytes simulates a file that grows between the +// walker's stat and the worker's hash: process must record the size read +// from the open handle after hashing (matching the hashed content), not +// the stale walk size. Binding the new digest to the old size would mint a +// contents row whose size_bytes can never match the honest content again. +func TestHashStatPinnedToHashedBytes(t *testing.T) { + root := t.TempDir() + abs := filepath.Join(root, "growing.txt") + content := "the full on-disk content after the append" + writeFile(t, abs, content) + + s := setupStore(t) + ctx := context.Background() + idx, err := newIndexer(ctx, s, root, Options{Name: "vol"}) + if err != nil { + t.Fatalf("newIndexer: %v", err) + } + + stale := workItem{ + absPath: abs, + relPath: "growing.txt", + sizeBytes: 3, // what the walker stat saw before the append + mtimeNs: 1, + } + res := idx.process(stale, make([]byte, hashReadBufferSize)) + if res.err != nil { + t.Fatalf("process: %v", res.err) + } + if res.row.SizeBytes != int64(len(content)) { + t.Fatalf("row size = %d, want %d (the hashed bytes, not the stale walk size %d)", + res.row.SizeBytes, len(content), stale.sizeBytes) + } + want := blake3Of(t, content) + if !bytes.Equal(res.row.Blake3, want) { + t.Fatalf("row digest = %x, want %x", res.row.Blake3, want) + } +} + +// TestReindexStableBytesNoSizeMismatch indexes a file, then re-indexes the +// same untouched bytes. The contents row minted on the first pass carries +// the size of the bytes that were hashed, so the second pass resolves the +// same (digest, size) pair and ApplyIndexBatch does not hit the immutable +// size cross-check that aborts the whole batch. +func TestReindexStableBytesNoSizeMismatch(t *testing.T) { + root := t.TempDir() + writeFile(t, filepath.Join(root, "stable.txt"), "stable content that never changes") + + s := setupStore(t) + ctx := context.Background() + if _, err := Index(ctx, s, root, Options{Name: "vol"}); err != nil { + t.Fatalf("first Index: %v", err) + } + rep, err := Index(ctx, s, root, Options{Name: "vol"}) + if err != nil { + t.Fatalf("second Index aborted (size cross-check?): %v", err) + } + if rep.Errors != 0 { + t.Fatalf("re-index reported errors: %+v", rep.ErrorList) + } + if rep.Unchanged != 1 { + t.Fatalf("re-index unchanged = %d, want 1", rep.Unchanged) + } +} diff --git a/sync/content_addressed.go b/sync/content_addressed.go index c4abdd6..80911c3 100644 --- a/sync/content_addressed.go +++ b/sync/content_addressed.go @@ -1,16 +1,21 @@ package sync import ( + "bytes" "context" "encoding/hex" "encoding/json" + "errors" "fmt" + "io" "os" "path" "path/filepath" "slices" "strconv" + "github.com/zeebo/blake3" + "github.com/mbertschler/squirrel/config" "github.com/mbertschler/squirrel/store" ) @@ -209,8 +214,16 @@ func (h *contentAddressedHandler) watermark(ctx context.Context, volID int64) (i // failures don't stop the loop — every object that lands now is recorded // and saves work on the retry — but any failure fails the run before // the segment is written. +// +// A source whose bytes drifted from the indexed hash is refused without a +// remote_objects row (errContentDrift): it is surfaced as a warning and +// fails the run, so the segment is not written and the watermark does not +// advance. The next run recomputes the same delta and re-offers the +// object, letting the honest bytes land once they are restored — without +// the drifted bytes ever being recorded under the hash. func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report, runID int64, delta []store.PathDelta) error { var confirmed []store.PathDelta + var drifted int for _, d := range plannedUploads(delta) { recorded, err := h.store.HasRemoteObject(ctx, d.ContentID, h.dest.Name) if err != nil { @@ -221,6 +234,11 @@ func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report continue } if err := h.uploadOneObject(ctx, runID, d); err != nil { + if errors.Is(err, errContentDrift) { + drifted++ + rep.Warnings = append(rep.Warnings, err.Error()) + continue + } rep.RcloneResult.Errors++ if int64(len(rep.RcloneResult.FailedFiles)) < maxFailedFiles { rep.RcloneResult.FailedFiles = append(rep.RcloneResult.FailedFiles, @@ -236,6 +254,9 @@ func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report if rep.RcloneResult.Errors > 0 { return fmt.Errorf("%d object(s) failed to land on %q; the manifest segment for run %d was not written and the durability vector did not advance", rep.RcloneResult.Errors, h.dest.Name, runID) } + if drifted > 0 { + return fmt.Errorf("%d object(s) on %q were refused for drifting from their indexed hash; re-index the volume and sync again — the manifest segment for run %d was not written and the durability vector did not advance", drifted, h.dest.Name, runID) + } return nil } @@ -307,22 +328,39 @@ func plannedUploads(delta []store.PathDelta) []store.PathDelta { return out } -// uploadOneObject lands one content object and records the upload. The -// pre-transfer stat guards the content-addressed invariant — the bytes -// stored under a hash must be the bytes that produced it — by refusing -// a source file whose size or mtime drifted from the indexed row; the -// post-transfer stat confirms presence and size on the remote. The +// errContentDrift marks a source file whose bytes no longer match the +// content hash the index bound them to. The upload path raises it instead +// of recording an object; uploadObjects turns it into a warning and fails +// the run so the watermark holds and the object is re-offered next run. +var errContentDrift = errors.New("source content drifted from its indexed hash") + +// uploadOneObject lands one content object and records the upload. It +// guards the content-addressed invariant — the bytes stored under a hash +// must be the bytes that produced it — by re-hashing the source file +// immediately before the transfer and refusing (errContentDrift) when the +// digest no longer matches the indexed hash, catching a +// size+mtime-preserving in-place edit that a metadata stat would pass. +// The post-transfer stat confirms presence and size on the remote, and the // upload record is written only after that confirmation, so a recorded -// hash is always a confirmed one; a crash in between re-uploads the -// same bytes idempotently on the next run. +// hash is always a confirmed one; a crash in between re-uploads the same +// bytes idempotently on the next run. +// +// Residual: rclone reads the file in a separate child process after the +// re-hash, so a writer that edits the file in the window between the hash +// and rclone's read could still upload drifted bytes. The window is the +// fork/exec of one rclone invocation rather than the whole walk-to-push +// span, and the scan-back fingerprint pass (#109) re-reads the landed +// object to upgrade the durability vector, catching any byte that slipped +// through before the object is treated as content-verified. func (h *contentAddressedHandler) uploadOneObject(ctx context.Context, runID int64, d store.PathDelta) error { src := filepath.Join(h.vol.Path, filepath.FromSlash(d.Path)) - fi, err := os.Stat(src) + digest, err := hashLocalFile(src) if err != nil { - return fmt.Errorf("stat %s: %w", src, err) + return fmt.Errorf("re-hash %s before upload: %w", src, err) } - if fi.Size() != d.SizeBytes || fi.ModTime().UnixNano() != d.MtimeNs { - return fmt.Errorf("%s changed on disk since it was indexed (size %d→%d, mtime %d→%d) — run `squirrel index %s` and sync again", d.Path, d.SizeBytes, fi.Size(), d.MtimeNs, fi.ModTime().UnixNano(), h.vol.Name) + if !bytes.Equal(digest, d.Blake3) { + return fmt.Errorf("%w: %s now hashes to %s, indexed as %s — run `squirrel index %s` and sync again", + errContentDrift, d.Path, hex.EncodeToString(digest), hex.EncodeToString(d.Blake3), h.vol.Name) } hash := hex.EncodeToString(d.Blake3) uri := h.objectURI(hash) @@ -346,6 +384,21 @@ func (h *contentAddressedHandler) uploadOneObject(ctx context.Context, runID int return nil } +// hashLocalFile streams the file at path through BLAKE3 and returns the +// raw 32-byte digest, the same hash the indexer binds content under. +func hashLocalFile(path string) ([]byte, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + h := blake3.New() + if _, err := io.Copy(h, f); err != nil { + return nil, err + } + return h.Sum(nil), nil +} + // uploadSegment writes the run's manifest segment and confirms it // landed at the expected size. Every run uploads one — an unchanged // volume yields an empty segment — so each successful run leaves the diff --git a/sync/content_addressed_test.go b/sync/content_addressed_test.go index d9b50a8..1d7dfae 100644 --- a/sync/content_addressed_test.go +++ b/sync/content_addressed_test.go @@ -10,6 +10,7 @@ import ( "runtime" "strings" "testing" + "time" "github.com/zeebo/blake3" @@ -679,6 +680,84 @@ func TestContentAddressedCrossVolumeDedup(t *testing.T) { } } +// TestContentAddressedDriftRefusesObject: a source file whose on-disk +// bytes drift from the indexed hash (here a same-length, mtime-preserving +// in-place edit the metadata stat alone cannot catch) is never recorded in +// remote_objects. The run is refused and warned, the watermark holds, and +// once the honest bytes are restored the next run lands and records the +// object normally — the drifted bytes never bound to the hash. +func TestContentAddressedDriftRefusesObject(t *testing.T) { + f := setupContentAddressedFixture(t) + f.write(t, "a.txt", "alpha") + f.index(t) + + indexedMtime := f.mtimeNs(t, "a.txt") + src := filepath.Join(f.pair.Volume.Path, "a.txt") + if err := os.WriteFile(src, []byte("ALPHA"), 0o644); err != nil { + t.Fatalf("in-place edit: %v", err) + } + restoreMtime := time.Unix(0, indexedMtime) + if err := os.Chtimes(src, restoreMtime, restoreMtime); err != nil { + t.Fatalf("restore mtime: %v", err) + } + + rep, err := f.sync(t) + if err == nil || !strings.Contains(err.Error(), "drifting from their indexed hash") { + t.Fatalf("expected a drift refusal, got %v", err) + } + if rep.Status != store.RunStatusFailed { + t.Fatalf("Status = %q, want failed", rep.Status) + } + if rep.RcloneResult.Transferred != 0 || rep.RcloneResult.Errors != 0 { + t.Fatalf("counts = transferred=%d errors=%d, want 0/0 (drift is a refusal, not a transfer error)", + rep.RcloneResult.Transferred, rep.RcloneResult.Errors) + } + if len(rep.Warnings) == 0 || !strings.Contains(strings.Join(rep.Warnings, "\n"), "drifted") { + t.Fatalf("Warnings = %+v, want a drift advisory", rep.Warnings) + } + if _, statErr := os.Stat(f.remotePath(ObjectsDirName, blake3Hex("alpha"))); statErr == nil { + t.Fatalf("drifted source uploaded an object under the indexed hash") + } + + row, err := f.store.GetByPath(context.Background(), f.volumeID(t), "a.txt") + if err != nil { + t.Fatalf("GetByPath: %v", err) + } + if has, _ := f.store.HasRemoteObject(context.Background(), row.ContentID, "offsite"); has { + t.Fatalf("drifted object was recorded in remote_objects") + } + if _, statErr := os.Stat(f.remotePath("pics", ManifestDirName, fmt.Sprintf("run-%d", rep.RunID))); statErr == nil { + t.Fatalf("manifest segment written despite a refused object") + } + if vector, err := f.store.ListDestinationRunIDs(context.Background(), f.volumeID(t), "offsite"); err != nil || len(vector) != 0 { + t.Fatalf("vector = %+v (err=%v), want empty after a refused object", vector, err) + } + + if err := os.WriteFile(src, []byte("alpha"), 0o644); err != nil { + t.Fatalf("restore honest bytes: %v", err) + } + if err := os.Chtimes(src, restoreMtime, restoreMtime); err != nil { + t.Fatalf("restore mtime again: %v", err) + } + rep2, err := f.sync(t) + if err != nil { + t.Fatalf("retry sync: %v", err) + } + if rep2.RcloneResult.Transferred != 1 { + t.Fatalf("retry transferred = %d, want 1 (honest bytes upload after drift cleared)", rep2.RcloneResult.Transferred) + } + if has, _ := f.store.HasRemoteObject(context.Background(), row.ContentID, "offsite"); !has { + t.Fatalf("honest re-upload was not recorded") + } + got, err := os.ReadFile(f.remotePath(ObjectsDirName, blake3Hex("alpha"))) + if err != nil { + t.Fatalf("object missing after honest re-upload: %v", err) + } + if string(got) != "alpha" { + t.Fatalf("recorded object = %q, want the honest bytes %q", got, "alpha") + } +} + // TestContentAddressedRefusesVolumeNamedObjects: a volume named like // the destination-root objects/ directory would collide with it. func TestContentAddressedRefusesVolumeNamedObjects(t *testing.T) {