Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,16 +555,16 @@ func (i *indexer) process(w workItem, buf []byte) resultItem {
return resultItem{row: existing, kind: kindUnchanged}
}

digest, err := hashFile(w.absPath, buf)
hashed, err := hashFile(w.absPath, buf)
if err != nil {
return resultItem{err: fmt.Errorf("hash %s: %w", w.absPath, err)}
}

row := i.rowFor(w, digest)
row := i.rowFor(w, hashed)
if !hasExisting {
return resultItem{row: row, kind: kindAdded}
}
if bytes.Equal(existing.Blake3, digest) && existing.Status == store.StatusPresent {
if bytes.Equal(existing.Blake3, hashed.digest) && existing.Status == store.StatusPresent {
return resultItem{row: existing, kind: kindUnchanged}
}
return resultItem{row: row, kind: kindModified}
Expand All @@ -579,13 +579,18 @@ func metadataMatches(existing store.FileRow, w workItem) bool {
existing.MtimeNs == w.mtimeNs
}

func (i *indexer) rowFor(w workItem, digest []byte) store.FileRow {
// rowFor builds the file row from the hashed-file result rather than the
// walk-time workItem: SizeBytes and MtimeNs come from a Stat of the open
// handle taken after hashing, so the digest and the metadata describe the
// same inode state — keeping the minted contents row internally
// consistent against the immutable-contents size cross-check.
func (i *indexer) rowFor(w workItem, hashed hashedFile) store.FileRow {
return store.FileRow{
VolumeID: i.volumeID,
Path: w.relPath,
Blake3: digest,
SizeBytes: w.sizeBytes,
MtimeNs: w.mtimeNs,
Blake3: hashed.digest,
SizeBytes: hashed.sizeBytes,
MtimeNs: hashed.mtimeNs,
Status: store.StatusPresent,
FirstSeenRunID: i.runID,
LastSeenRunID: i.runID,
Expand Down Expand Up @@ -676,15 +681,41 @@ func resolveNamedVolume(ctx context.Context, s *store.Store, name, absRoot strin
// the run; allocating per-file made GC pressure outweigh the syscall win.
const hashReadBufferSize = 1 << 20

func hashFile(path string, buf []byte) ([]byte, error) {
// hashedFile is the digest of a file's bytes paired with the size and
// mtime read from the same open handle immediately after hashing.
type hashedFile struct {
digest []byte
sizeBytes int64
mtimeNs int64
}

// hashFile hashes the file at path and reads its size and mtime from the
// open handle after the hash completes. Stat-after-hash on the live handle
// (rather than re-opening by path, which would reintroduce a race) pins the
// metadata to the same bytes that produced the digest, even if the file was
// growing during the walk-to-hash window.
//
// Residual: an append landing between the hash reaching EOF and the Stat
// can still report a size above the bytes hashed; the window is a single
// syscall gap (vs. the whole walk-to-hash span this closes), and a later
// re-index of the settled file supersedes to a consistent row.
func hashFile(path string, buf []byte) (hashedFile, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
return hashedFile{}, err
}
defer f.Close()
h := blake3.New()
if _, err := io.CopyBuffer(h, f, buf); err != nil {
return nil, err
return hashedFile{}, err
}
fi, err := f.Stat()
Comment on lines 708 to +712
if err != nil {
return hashedFile{}, err
}
return h.Sum(nil), nil
return hashedFile{
digest: h.Sum(nil),
sizeBytes: fi.Size(),
mtimeNs: fi.ModTime().UnixNano(),
}, nil
}
72 changes: 72 additions & 0 deletions index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ import (
"testing"
"time"

"github.com/zeebo/blake3"

"github.com/mbertschler/squirrel/store"
"github.com/mbertschler/squirrel/volmark"
)

func blake3Of(t *testing.T, content string) []byte {
t.Helper()
sum := blake3.Sum256([]byte(content))
return sum[:]
}

func setupStore(t *testing.T) *store.Store {
t.Helper()
dsn := filepath.Join(t.TempDir(), "test.db")
Expand Down Expand Up @@ -1089,3 +1097,67 @@ func TestIndexFlipsOffloadedBackToPresent(t *testing.T) {
after.FirstSeenRunID, before.FirstSeenRunID)
}
}

// TestHashStatPinnedToHashedBytes simulates a file that grows between the
// walker's stat and the worker's hash: process must record the size read
// from the open handle after hashing (matching the hashed content), not
// the stale walk size. Binding the new digest to the old size would mint a
// contents row whose size_bytes can never match the honest content again.
func TestHashStatPinnedToHashedBytes(t *testing.T) {
root := t.TempDir()
abs := filepath.Join(root, "growing.txt")
content := "the full on-disk content after the append"
writeFile(t, abs, content)

s := setupStore(t)
ctx := context.Background()
idx, err := newIndexer(ctx, s, root, Options{Name: "vol"})
if err != nil {
t.Fatalf("newIndexer: %v", err)
}

stale := workItem{
absPath: abs,
relPath: "growing.txt",
sizeBytes: 3, // what the walker stat saw before the append
mtimeNs: 1,
}
res := idx.process(stale, make([]byte, hashReadBufferSize))
if res.err != nil {
t.Fatalf("process: %v", res.err)
}
if res.row.SizeBytes != int64(len(content)) {
t.Fatalf("row size = %d, want %d (the hashed bytes, not the stale walk size %d)",
res.row.SizeBytes, len(content), stale.sizeBytes)
}
want := blake3Of(t, content)
if !bytes.Equal(res.row.Blake3, want) {
t.Fatalf("row digest = %x, want %x", res.row.Blake3, want)
}
}

// TestReindexStableBytesNoSizeMismatch indexes a file, then re-indexes the
// same untouched bytes. The contents row minted on the first pass carries
// the size of the bytes that were hashed, so the second pass resolves the
// same (digest, size) pair and ApplyIndexBatch does not hit the immutable
// size cross-check that aborts the whole batch.
func TestReindexStableBytesNoSizeMismatch(t *testing.T) {
root := t.TempDir()
writeFile(t, filepath.Join(root, "stable.txt"), "stable content that never changes")

s := setupStore(t)
ctx := context.Background()
if _, err := Index(ctx, s, root, Options{Name: "vol"}); err != nil {
t.Fatalf("first Index: %v", err)
}
rep, err := Index(ctx, s, root, Options{Name: "vol"})
if err != nil {
t.Fatalf("second Index aborted (size cross-check?): %v", err)
}
if rep.Errors != 0 {
t.Fatalf("re-index reported errors: %+v", rep.ErrorList)
}
if rep.Unchanged != 1 {
t.Fatalf("re-index unchanged = %d, want 1", rep.Unchanged)
}
}
75 changes: 64 additions & 11 deletions sync/content_addressed.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package sync

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"slices"
"strconv"

"github.com/zeebo/blake3"

"github.com/mbertschler/squirrel/config"
"github.com/mbertschler/squirrel/store"
)
Expand Down Expand Up @@ -209,8 +214,16 @@ func (h *contentAddressedHandler) watermark(ctx context.Context, volID int64) (i
// failures don't stop the loop — every object that lands now is recorded
// and saves work on the retry — but any failure fails the run before
// the segment is written.
//
// A source whose bytes drifted from the indexed hash is refused without a
// remote_objects row (errContentDrift): it is surfaced as a warning and
// fails the run, so the segment is not written and the watermark does not
// advance. The next run recomputes the same delta and re-offers the
// object, letting the honest bytes land once they are restored — without
// the drifted bytes ever being recorded under the hash.
func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report, runID int64, delta []store.PathDelta) error {
var confirmed []store.PathDelta
var drifted int
for _, d := range plannedUploads(delta) {
recorded, err := h.store.HasRemoteObject(ctx, d.ContentID, h.dest.Name)
if err != nil {
Expand All @@ -221,6 +234,11 @@ func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report
continue
}
if err := h.uploadOneObject(ctx, runID, d); err != nil {
if errors.Is(err, errContentDrift) {
drifted++
rep.Warnings = append(rep.Warnings, err.Error())
continue
}
rep.RcloneResult.Errors++
if int64(len(rep.RcloneResult.FailedFiles)) < maxFailedFiles {
rep.RcloneResult.FailedFiles = append(rep.RcloneResult.FailedFiles,
Expand All @@ -236,6 +254,9 @@ func (h *contentAddressedHandler) uploadObjects(ctx context.Context, rep *Report
if rep.RcloneResult.Errors > 0 {
return fmt.Errorf("%d object(s) failed to land on %q; the manifest segment for run %d was not written and the durability vector did not advance", rep.RcloneResult.Errors, h.dest.Name, runID)
}
if drifted > 0 {
return fmt.Errorf("%d object(s) on %q were refused for drifting from their indexed hash; re-index the volume and sync again — the manifest segment for run %d was not written and the durability vector did not advance", drifted, h.dest.Name, runID)
}
return nil
}

Expand Down Expand Up @@ -307,22 +328,39 @@ func plannedUploads(delta []store.PathDelta) []store.PathDelta {
return out
}

// uploadOneObject lands one content object and records the upload. The
// pre-transfer stat guards the content-addressed invariant — the bytes
// stored under a hash must be the bytes that produced it — by refusing
// a source file whose size or mtime drifted from the indexed row; the
// post-transfer stat confirms presence and size on the remote. The
// errContentDrift marks a source file whose bytes no longer match the
// content hash the index bound them to. The upload path raises it instead
// of recording an object; uploadObjects turns it into a warning and fails
// the run so the watermark holds and the object is re-offered next run.
var errContentDrift = errors.New("source content drifted from its indexed hash")

// uploadOneObject lands one content object and records the upload. It
// guards the content-addressed invariant — the bytes stored under a hash
// must be the bytes that produced it — by re-hashing the source file
// immediately before the transfer and refusing (errContentDrift) when the
// digest no longer matches the indexed hash, catching a
// size+mtime-preserving in-place edit that a metadata stat would pass.
// The post-transfer stat confirms presence and size on the remote, and the
// upload record is written only after that confirmation, so a recorded
// hash is always a confirmed one; a crash in between re-uploads the
// same bytes idempotently on the next run.
// hash is always a confirmed one; a crash in between re-uploads the same
// bytes idempotently on the next run.
//
// Residual: rclone reads the file in a separate child process after the
// re-hash, so a writer that edits the file in the window between the hash
// and rclone's read could still upload drifted bytes. The window is the
// fork/exec of one rclone invocation rather than the whole walk-to-push
// span, and the scan-back fingerprint pass (#109) re-reads the landed
// object to upgrade the durability vector, catching any byte that slipped
// through before the object is treated as content-verified.
func (h *contentAddressedHandler) uploadOneObject(ctx context.Context, runID int64, d store.PathDelta) error {
src := filepath.Join(h.vol.Path, filepath.FromSlash(d.Path))
fi, err := os.Stat(src)
digest, err := hashLocalFile(src)
if err != nil {
return fmt.Errorf("stat %s: %w", src, err)
return fmt.Errorf("re-hash %s before upload: %w", src, err)
}
if fi.Size() != d.SizeBytes || fi.ModTime().UnixNano() != d.MtimeNs {
return fmt.Errorf("%s changed on disk since it was indexed (size %d→%d, mtime %d→%d) — run `squirrel index %s` and sync again", d.Path, d.SizeBytes, fi.Size(), d.MtimeNs, fi.ModTime().UnixNano(), h.vol.Name)
if !bytes.Equal(digest, d.Blake3) {
return fmt.Errorf("%w: %s now hashes to %s, indexed as %s — run `squirrel index %s` and sync again",
errContentDrift, d.Path, hex.EncodeToString(digest), hex.EncodeToString(d.Blake3), h.vol.Name)
}
hash := hex.EncodeToString(d.Blake3)
uri := h.objectURI(hash)
Expand All @@ -346,6 +384,21 @@ func (h *contentAddressedHandler) uploadOneObject(ctx context.Context, runID int
return nil
}

// hashLocalFile streams the file at path through BLAKE3 and returns the
// raw 32-byte digest, the same hash the indexer binds content under.
func hashLocalFile(path string) ([]byte, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
h := blake3.New()
if _, err := io.Copy(h, f); err != nil {
return nil, err
}
return h.Sum(nil), nil
}

// uploadSegment writes the run's manifest segment and confirms it
// landed at the expected size. Every run uploads one — an unchanged
// volume yields an empty segment — so each successful run leaves the
Expand Down
Loading
Loading