diff --git a/cmd/squirrel/sync.go b/cmd/squirrel/sync.go index 5f95595..83a1297 100644 --- a/cmd/squirrel/sync.go +++ b/cmd/squirrel/sync.go @@ -41,7 +41,7 @@ func newSyncCmd() *cobra.Command { cmd.Flags().StringVar(&to, "to", "", "limit to this destination name (default: every destination declared on the volume)") cmd.Flags().BoolVar(&shallow, "shallow", false, "skip BLAKE3 verification; trust rclone's default size+mtime comparison") cmd.Flags().BoolVar(&dryRun, "dry-run", false, "preview rclone actions without transferring; no runs row is written") - cmd.Flags().BoolVar(&initDst, "init", false, "bootstrap a .squirrel-volume marker at the destination on first sync (refused subsequently if the marker mismatches)") + cmd.Flags().BoolVar(&initDst, "init", false, "authorise first-use destination bootstrap: write a .squirrel-volume marker, or create a kopia repository when connect finds none (refused without --init so a typo or outage can't mint a fresh empty target)") return cmd } diff --git a/store/destination_run_ids_test.go b/store/destination_run_ids_test.go index 656bc49..79ad41c 100644 --- a/store/destination_run_ids_test.go +++ b/store/destination_run_ids_test.go @@ -22,6 +22,15 @@ func TestMigrateV18ToV19AddsVerifyMethod(t *testing.T) { `CREATE TABLE schema_version (version INTEGER NOT NULL PRIMARY KEY)`, `CREATE TABLE volumes (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, path TEXT NOT NULL)`, `CREATE TABLE nodes (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, endpoint TEXT, public_key_fingerprint TEXT)`, + // contents exists from v14 on; a real v18 DB carries it, and the + // v20→v21 triggers attach to it, so the minimal fixture must too. + `CREATE TABLE contents ( + id INTEGER PRIMARY KEY, + blake3 BLOB NOT NULL UNIQUE CHECK (length(blake3) = 32), + size_bytes INTEGER NOT NULL, + origin_node_id INTEGER REFERENCES nodes(id), + origin_run_id INTEGER + )`, `CREATE TABLE destination_run_ids ( volume_id INTEGER NOT NULL REFERENCES volumes(id), destination TEXT NOT NULL, diff --git a/store/hookruns.go b/store/hookruns.go index 0289605..76ea0fa 100644 --- a/store/hookruns.go +++ b/store/hookruns.go @@ -3,6 +3,7 @@ package store import ( "context" "database/sql" + "errors" "fmt" ) @@ -112,32 +113,58 @@ func (s *Store) BeginHookRun(ctx context.Context, spec HookRunSpec) (int64, erro return id, nil } +// isTerminalHookStatus reports whether status is one of the two terminal +// hook states. A row in either must not be re-finalised by FinishHookRun. +func isTerminalHookStatus(status string) bool { + return status == HookStatusSuccess || status == HookStatusFailed +} + // FinishHookRun records the terminal state of a hook run. exitCode is // stored as-is (pass an invalid sql.NullInt64 when the process produced // no code, e.g. spawn failure or timeout); errMsg is stored as NULL when // empty. Returns an error if id matches no row so a hook is never left // stuck in 'running'. +// +// Like FinishRun, the transition is guarded: a hook run already in a +// terminal status is never re-finalised — the first terminal write wins +// and FinishHookRun returns ErrAlreadyFinished (matchable via errors.Is) +// without touching the row, so a double-finish bug or a buggy retry can't +// silently rewrite the recorded status, exit code, and end timestamp. The +// read and the update share one transaction so the check and the write +// can't race. func (s *Store) FinishHookRun(ctx context.Context, id int64, status string, exitCode sql.NullInt64, errMsg string) error { if status != HookStatusSuccess && status != HookStatusFailed { return fmt.Errorf("FinishHookRun: status must be %q or %q, got %q", HookStatusSuccess, HookStatusFailed, status) } + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin finish hook run %d: %w", id, err) + } + defer func() { _ = tx.Rollback() }() + + var current string + switch err := tx.QueryRowContext(ctx, `SELECT status FROM hook_runs WHERE id = ?`, id).Scan(¤t); { + case errors.Is(err, sql.ErrNoRows): + return fmt.Errorf("finish hook run %d: no such hook run", id) + case err != nil: + return fmt.Errorf("finish hook run %d read status: %w", id, err) + } + if isTerminalHookStatus(current) { + return fmt.Errorf("finish hook run %d (status %s): %w", id, current, ErrAlreadyFinished) + } + var errVal sql.NullString if errMsg != "" { errVal = sql.NullString{String: errMsg, Valid: true} } - res, err := s.db.ExecContext(ctx, ` + if _, err := tx.ExecContext(ctx, ` UPDATE hook_runs SET ended_at_ns = ?, status = ?, exit_code = ?, error = ? WHERE id = ? - `, NowNs(), status, exitCode, errVal, id) - if err != nil { + `, NowNs(), status, exitCode, errVal, id); err != nil { return fmt.Errorf("finish hook run %d: %w", id, err) } - n, err := res.RowsAffected() - if err != nil { - return fmt.Errorf("finish hook run %d rows affected: %w", id, err) - } - if n == 0 { - return fmt.Errorf("finish hook run %d: no such hook run", id) + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit finish hook run %d: %w", id, err) } return nil } diff --git a/store/hookruns_test.go b/store/hookruns_test.go index db3ea9e..3545c1a 100644 --- a/store/hookruns_test.go +++ b/store/hookruns_test.go @@ -131,6 +131,45 @@ func TestFinishHookRunUnknownID(t *testing.T) { } } +// TestFinishHookRunRefusesTerminalRow: the first terminal write wins. A +// second finish is refused with ErrAlreadyFinished and leaves the +// recorded status, exit code, and end timestamp untouched (#114) — the +// same first-write-wins guard FinishRun has. +func TestFinishHookRunRefusesTerminalRow(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + vol, _ := s.CreateVolume(ctx, "v", "/tmp/v") + id, err := s.BeginHookRun(ctx, HookRunSpec{VolumeID: vol.ID, Trigger: HookTriggerInterval}) + if err != nil { + t.Fatalf("BeginHookRun: %v", err) + } + + firstExit := sql.NullInt64{Int64: 0, Valid: true} + if err := s.FinishHookRun(ctx, id, HookStatusSuccess, firstExit, ""); err != nil { + t.Fatalf("first FinishHookRun: %v", err) + } + before, err := s.hookRunByID(ctx, id) + if err != nil { + t.Fatalf("read back after first finish: %v", err) + } + + err = s.FinishHookRun(ctx, id, HookStatusFailed, sql.NullInt64{Int64: 7, Valid: true}, "second finish") + if !errors.Is(err, ErrAlreadyFinished) { + t.Fatalf("second FinishHookRun err = %v, want ErrAlreadyFinished", err) + } + + after, err := s.hookRunByID(ctx, id) + if err != nil { + t.Fatalf("read back after refused finish: %v", err) + } + if after.Status != HookStatusSuccess { + t.Fatalf("status = %q after refused second finish, want success", after.Status) + } + if after.ExitCode != before.ExitCode || after.EndedAtNs != before.EndedAtNs || after.Error != before.Error { + t.Fatalf("terminal row mutated by refused finish: before=%+v after=%+v", before, after) + } +} + func TestListHookRuns(t *testing.T) { s := openTestStore(t) ctx := context.Background() diff --git a/store/migrations.go b/store/migrations.go index 610938d..592039f 100644 --- a/store/migrations.go +++ b/store/migrations.go @@ -10,7 +10,7 @@ import ( ) // SchemaVersion is the schema version this binary writes and reads. -const SchemaVersion = 20 +const SchemaVersion = 21 // freshSchemaBaseline is the version applied to a brand-new database. The // chain in `migrations` continues from here. v1 is no longer reachable from @@ -56,6 +56,7 @@ func buildMigrations(mctx migrationCtx) []migration { {version: 18, up: migrateV17ToV18}, {version: 19, up: migrateV18ToV19}, {version: 20, up: migrateV19ToV20}, + {version: 21, up: migrateV20ToV21}, } } @@ -1822,3 +1823,49 @@ func migrateV19ToV20(ctx context.Context, db *sql.DB) error { } return tx.Commit() } + +// --- v20 → v21 --- + +// migrateV20ToV21 restores schema-level immutability for the contents +// table. contents is the append-only content entity: one row per BLAKE3, +// carrying its size and origin. The id↔blake3 binding is already immutable +// by construction (blake3 is UNIQUE), but the v13→v14 reshape dropped the +// files_blake3_immutable trigger without installing an equivalent guard on +// the new table, so a future bug could UPDATE a row's size_bytes/origin_* +// in place or DELETE a row whose hash other rows still reference. +// +// Two triggers re-assert the guarantee the append-only contract implies: +// any UPDATE or DELETE on a contents row aborts. The sanctioned way to +// record different content at a path is to supersede the files row and +// insert a new one (see Upsert), which leaves the contents row untouched. +func migrateV20ToV21(ctx context.Context, db *sql.DB) error { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + for _, q := range append(contentsImmutableTriggers(), `INSERT INTO schema_version (version) VALUES (21)`) { + if _, err := tx.ExecContext(ctx, q); err != nil { + return fmt.Errorf("v20→v21: %w", err) + } + } + return tx.Commit() +} + +// contentsImmutableTriggers returns the DDL for the two triggers that make +// the contents table append-only at the schema level: a row's size and +// origin are fixed once written, and a row is never removed. Shared with +// any future fresh-baseline so the guarantee survives a schema rebase. +func contentsImmutableTriggers() []string { + return []string{ + `CREATE TRIGGER contents_no_update BEFORE UPDATE ON contents + BEGIN + SELECT RAISE(ABORT, 'contents is append-only; supersede the files row and insert new content instead of updating'); + END`, + `CREATE TRIGGER contents_no_delete BEFORE DELETE ON contents + BEGIN + SELECT RAISE(ABORT, 'contents is append-only; a content row is never deleted'); + END`, + } +} diff --git a/store/runs.go b/store/runs.go index 54e9056..ed48fa0 100644 --- a/store/runs.go +++ b/store/runs.go @@ -525,6 +525,10 @@ type SyncRunSpec struct { // sync side. Syncs to *different* destinations stay free to overlap — // they touch disjoint vectors. // +// An in-flight offload also blocks: offload unlinks on-disk bytes the sync +// is enumerating, and offload itself blocks on every kind, so the sync and +// index gates name it too to keep the exclusion symmetric. +// // Returns (newID, nil, nil) when the row was inserted; (0, &blocker, // nil) when refused — the caller is expected to render a diagnostic // using the blocker's id and started_at_ns. Stale rows from crashed @@ -547,7 +551,7 @@ func (s *Store) BeginSyncRunIfClear(ctx context.Context, spec SyncRunSpec) (int6 WHERE status = 'running' AND volume_id = ? AND ( (kind = 'sync' AND destination = ?) - OR kind IN ('index', 'audit') + OR kind IN ('index', 'audit', 'offload') ) ORDER BY id LIMIT 1 `, spec.VolumeID, spec.Destination) @@ -579,14 +583,16 @@ func (s *Store) BeginSyncRunIfClear(ctx context.Context, spec SyncRunSpec) (int6 } // BeginIndexRunIfClear atomically inserts a 'running' kind='index' or -// kind='audit' row for volumeID iff no other index- or audit-kind run -// is currently in flight against the same volume. Symmetric to +// kind='audit' row for volumeID iff no index-, audit-, or offload-kind +// run is currently in flight against the same volume. Symmetric to // BeginSyncRunIfClear (BEGIN IMMEDIATE + check + insert in one tx) so // two concurrent callers cannot both observe "no running run" and both // insert. Cross-kind: an in-flight 'index' blocks a new 'audit' and // vice versa because both walk the volume and call MarkMissing with // their own run-id — letting them overlap is exactly the bug this -// guards against. +// guards against. An in-flight offload blocks too: it unlinks bytes the +// walk would otherwise observe and flip, and offload defers to every +// kind, so the block is symmetric. // // A running sync does not block an index here, while a running index // does block a new sync (BeginSyncRunIfClear). The asymmetry is @@ -614,7 +620,7 @@ func (s *Store) BeginIndexRunIfClear(ctx context.Context, kind string, volumeID row := tx.QueryRowContext(ctx, ` SELECT `+runColumns+` FROM runs - WHERE kind IN ('index', 'audit') AND status = 'running' + WHERE kind IN ('index', 'audit', 'offload') AND status = 'running' AND volume_id = ? ORDER BY id LIMIT 1 `, volumeID) diff --git a/store/schema.sql b/store/schema.sql index 3632ed4..af2afc2 100644 --- a/store/schema.sql +++ b/store/schema.sql @@ -1,6 +1,6 @@ -- Generated by `go test ./store -update-schema` — DO NOT EDIT. -- --- Flattened snapshot of the squirrel index schema at version 20, for humans +-- Flattened snapshot of the squirrel index schema at version 21, for humans -- and agents who want the current shape without replaying the migration -- chain in migrations.go. It is NOT used to create or migrate databases — -- a fresh DB is built by applyV5 plus the migration registry. The golden @@ -17,6 +17,16 @@ CREATE TABLE contents ( CREATE INDEX idx_contents_origin_node ON contents(origin_node_id) WHERE origin_node_id IS NOT NULL; +CREATE TRIGGER contents_no_delete BEFORE DELETE ON contents + BEGIN + SELECT RAISE(ABORT, 'contents is append-only; a content row is never deleted'); + END; + +CREATE TRIGGER contents_no_update BEFORE UPDATE ON contents + BEGIN + SELECT RAISE(ABORT, 'contents is append-only; supersede the files row and insert new content instead of updating'); + END; + CREATE TABLE destination_push_freshness ( volume_id INTEGER NOT NULL REFERENCES volumes(id), destination TEXT NOT NULL, diff --git a/store/store_test.go b/store/store_test.go index 324b385..c765b37 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -3472,6 +3472,76 @@ func TestBeginIndexRunIfClearAllowsConcurrentSync(t *testing.T) { } } +// TestBeginSyncRunIfClearBlockedByOffload makes the run gate symmetric: +// offload already blocks on every kind, so a sync must refuse to start +// while an offload is in flight (#114). A concurrent unlink would +// otherwise race the sync's enumeration. +func TestBeginSyncRunIfClearBlockedByOffload(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + vID := makeVolume(t, s, "/v") + + offID, blocker, err := s.BeginOffloadRunIfClear(ctx, vID) + if err != nil || blocker != nil || offID == 0 { + t.Fatalf("begin offload: id=%d blocker=%+v err=%v", offID, blocker, err) + } + + syncID, syncBlocker, err := s.BeginSyncRunIfClear(ctx, SyncRunSpec{VolumeID: vID, Destination: "backup"}) + if err != nil { + t.Fatalf("begin sync during offload: %v", err) + } + if syncBlocker == nil || syncID != 0 { + t.Fatalf("sync admitted (id=%d) while offload running, want blocked", syncID) + } + if syncBlocker.Kind != RunKindOffload { + t.Fatalf("blocker kind = %q, want %q", syncBlocker.Kind, RunKindOffload) + } + + if err := s.FinishRun(ctx, offID, RunStatusSuccess, "", 0); err != nil { + t.Fatalf("finish offload: %v", err) + } + syncID, syncBlocker, err = s.BeginSyncRunIfClear(ctx, SyncRunSpec{VolumeID: vID, Destination: "backup"}) + if err != nil || syncBlocker != nil || syncID == 0 { + t.Fatalf("sync refused after offload finished: id=%d blocker=%+v err=%v", syncID, syncBlocker, err) + } +} + +// TestBeginIndexRunIfClearBlockedByOffload: an in-flight offload blocks a +// new index or audit so the walk can't observe-and-flip a row mid-unlink +// (#114). +func TestBeginIndexRunIfClearBlockedByOffload(t *testing.T) { + s := openTestStore(t) + ctx := context.Background() + + for _, indexKind := range []string{RunKindIndex, RunKindAudit} { + vID := makeVolume(t, s, "/v-"+indexKind) + + offID, blocker, err := s.BeginOffloadRunIfClear(ctx, vID) + if err != nil || blocker != nil || offID == 0 { + t.Fatalf("%s: begin offload: id=%d blocker=%+v err=%v", indexKind, offID, blocker, err) + } + + idxID, idxBlocker, err := s.BeginIndexRunIfClear(ctx, indexKind, vID, false) + if err != nil { + t.Fatalf("%s: begin index during offload: %v", indexKind, err) + } + if idxBlocker == nil || idxID != 0 { + t.Fatalf("%s: index admitted (id=%d) while offload running, want blocked", indexKind, idxID) + } + if idxBlocker.Kind != RunKindOffload { + t.Fatalf("%s: blocker kind = %q, want %q", indexKind, idxBlocker.Kind, RunKindOffload) + } + + if err := s.FinishRun(ctx, offID, RunStatusSuccess, "", 0); err != nil { + t.Fatalf("%s: finish offload: %v", indexKind, err) + } + idxID, idxBlocker, err = s.BeginIndexRunIfClear(ctx, indexKind, vID, false) + if err != nil || idxBlocker != nil || idxID == 0 { + t.Fatalf("%s: index refused after offload finished: id=%d blocker=%+v err=%v", indexKind, idxID, idxBlocker, err) + } + } +} + // TestBackupVacuumIntoProducesValidSnapshot exercises Backup against // a populated store, then opens the snapshot as a regular DB and // verifies it carries the same volume row. Cheapest reliable check @@ -3942,3 +4012,206 @@ func assertDestinationStoreAfterMigration(t *testing.T, s *Store) { t.Fatalf("history = %+v, want one advance to 5", history) } } + +// v18Fixture is a populated v18 database covering the offload-substrate +// tables (contents, remote_objects, destination_run_ids) so the +// v18→v19→v20→v21 chain can be exercised against real rows. The runs +// kind CHECK already carries 'offload' (v15) and status_changed_run_id +// exists on files (v18); verify_method, destination_push_freshness, and +// the contents triggers are what the chain still adds. +func v18Fixture() []string { + return []string{ + `CREATE TABLE schema_version (version INTEGER NOT NULL PRIMARY KEY)`, + `CREATE TABLE volumes (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, path TEXT NOT NULL)`, + `CREATE TABLE nodes (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, endpoint TEXT, public_key_fingerprint TEXT)`, + `CREATE TABLE runs ( + id INTEGER PRIMARY KEY, + kind TEXT NOT NULL CHECK (kind IN ('index','sync','restore','audit','offload')), + volume_id INTEGER REFERENCES volumes(id), + destination TEXT, + started_at_ns INTEGER NOT NULL, + ended_at_ns INTEGER, + status TEXT NOT NULL CHECK (status IN ('running','success','failed','partial')), + error TEXT, + file_count INTEGER NOT NULL DEFAULT 0, + peer_node_id INTEGER REFERENCES nodes(id), + correlated_run_id INTEGER, + shallow INTEGER CHECK (shallow IS NULL OR shallow IN (0, 1)), + CHECK ( + (kind IN ('index','audit','offload') AND destination IS NULL) OR + (kind IN ('sync','restore') AND destination IS NOT NULL AND destination != '') + ) + )`, + `CREATE TABLE folders ( + id INTEGER PRIMARY KEY, + volume_id INTEGER NOT NULL REFERENCES volumes(id), + parent_id INTEGER REFERENCES folders(id), + path TEXT NOT NULL, + shallow_blake3 BLOB, + deep_blake3 BLOB, + last_changed_run_id INTEGER REFERENCES runs(id), + file_count INTEGER NOT NULL DEFAULT 0, + cumulative_size INTEGER NOT NULL DEFAULT 0, + UNIQUE (volume_id, path) + )`, + `CREATE TABLE contents ( + id INTEGER PRIMARY KEY, + blake3 BLOB NOT NULL UNIQUE CHECK (length(blake3) = 32), + size_bytes INTEGER NOT NULL, + origin_node_id INTEGER REFERENCES nodes(id), + origin_run_id INTEGER + )`, + `CREATE TABLE files ( + folder_id INTEGER NOT NULL REFERENCES folders(id), + name TEXT NOT NULL, + content_id INTEGER NOT NULL REFERENCES contents(id), + mtime_ns INTEGER NOT NULL, + status TEXT NOT NULL CHECK (status IN ('present','missing','superseded','offloaded')), + first_seen_run_id INTEGER NOT NULL REFERENCES runs(id), + last_seen_run_id INTEGER NOT NULL REFERENCES runs(id), + indexed_at_ns INTEGER NOT NULL, + status_changed_run_id INTEGER REFERENCES runs(id), + PRIMARY KEY (folder_id, name, content_id) + )`, + `CREATE UNIQUE INDEX uniq_files_live_per_path ON files(folder_id, name) WHERE status != 'superseded'`, + `CREATE TABLE destination_run_ids ( + volume_id INTEGER NOT NULL REFERENCES volumes(id), + destination TEXT NOT NULL, + origin_node_id INTEGER NOT NULL REFERENCES nodes(id), + origin_run_id INTEGER NOT NULL, + updated_at_ns INTEGER NOT NULL, + PRIMARY KEY (volume_id, destination, origin_node_id) + )`, + `CREATE TABLE destination_run_ids_history ( + id INTEGER PRIMARY KEY, + volume_id INTEGER NOT NULL, + destination TEXT NOT NULL, + origin_node_id INTEGER NOT NULL, + origin_run_id INTEGER NOT NULL, + at_ns INTEGER NOT NULL + )`, + `CREATE TABLE remote_objects ( + content_id INTEGER NOT NULL REFERENCES contents(id), + destination TEXT NOT NULL, + uploaded_run_id INTEGER NOT NULL REFERENCES runs(id), + checksum_algo TEXT, + checksum TEXT, + verified_at_ns INTEGER, + PRIMARY KEY (content_id, destination), + CHECK ((checksum_algo IS NULL) = (checksum IS NULL)) + )`, + `INSERT INTO schema_version (version) VALUES (18)`, + `INSERT INTO volumes (id, name, path) VALUES (1, 'photos', '/photos')`, + `INSERT INTO nodes (id, name) VALUES (1, 'self'), (2, 'peer')`, + `INSERT INTO runs (id, kind, volume_id, destination, started_at_ns, status) + VALUES (1, 'index', 1, NULL, 100, 'success'), + (2, 'sync', 1, 'bucket', 200, 'success')`, + `INSERT INTO folders (id, volume_id, parent_id, path) VALUES (1, 1, NULL, '')`, + `INSERT INTO contents (id, blake3, size_bytes, origin_node_id, origin_run_id) VALUES + (1, X'` + strings.Repeat("11", 32) + `', 10, NULL, NULL), + (2, X'` + strings.Repeat("22", 32) + `', 20, 2, 9)`, + `INSERT INTO files (folder_id, name, content_id, mtime_ns, status, first_seen_run_id, last_seen_run_id, indexed_at_ns, status_changed_run_id) VALUES + (1, 'a.txt', 1, 1, 'present', 1, 1, 1, 1), + (1, 'b.txt', 2, 2, 'present', 1, 1, 2, 1)`, + `INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns) + VALUES (1, 'bucket', 1, 7, 100)`, + `INSERT INTO destination_run_ids_history (volume_id, destination, origin_node_id, origin_run_id, at_ns) + VALUES (1, 'bucket', 1, 7, 100)`, + `INSERT INTO remote_objects (content_id, destination, uploaded_run_id, checksum_algo, checksum, verified_at_ns) + VALUES (1, 'bucket', 2, 'blake3', 'deadbeef', 150)`, + } +} + +// TestMigrateV18ChainToV21 drives a populated v18 database through the +// v19–v21 chain and confirms the offload-substrate rows survive intact +// (destination_run_ids with its NULL-backfilled verify_method, +// remote_objects with its fingerprint) and that the v21 contents triggers +// actually abort an UPDATE and a DELETE on a contents row. +func TestMigrateV18ChainToV21(t *testing.T) { + dsn := filepath.Join(t.TempDir(), "test.db") + rawDB, err := sql.Open("sqlite", dsn) + if err != nil { + t.Fatalf("raw sql.Open: %v", err) + } + for _, q := range v18Fixture() { + if _, err := rawDB.Exec(q); err != nil { + rawDB.Close() + t.Fatalf("v18 DDL %q: %v", q, err) + } + } + rawDB.Close() + + s, err := OpenWithOptions(dsn, OpenOptions{NodeName: "self"}) + if err != nil { + t.Fatalf("Open (migrates v18→v%d): %v", SchemaVersion, err) + } + defer s.Close() + ctx := context.Background() + + if v, _ := s.CurrentSchemaVersion(ctx); v != SchemaVersion { + t.Fatalf("schema_version = %d, want %d", v, SchemaVersion) + } + + assertV18SubstrateSurvived(t, s) + assertContentsTriggersAbort(t, s) +} + +// assertV18SubstrateSurvived checks the offload-substrate rows carried +// through the v19–v21 chain: the durability vector keeps its coordinate +// with a NULL-backfilled verify_method, and the remote_objects fingerprint +// is intact. +func assertV18SubstrateSurvived(t *testing.T, s *Store) { + t.Helper() + ctx := context.Background() + + got, err := s.GetDestinationRunID(ctx, 1, "bucket", 1) + if err != nil { + t.Fatalf("GetDestinationRunID: %v", err) + } + if got.OriginRunID != 7 { + t.Fatalf("origin run = %d, want 7 (carried over)", got.OriginRunID) + } + if got.VerifyMethod != "" { + t.Fatalf("verify method = %q, want empty (NULL backfill)", got.VerifyMethod) + } + + var algo, checksum string + if err := s.db.QueryRowContext(ctx, + `SELECT checksum_algo, checksum FROM remote_objects WHERE content_id = 1 AND destination = 'bucket'`). + Scan(&algo, &checksum); err != nil { + t.Fatalf("remote_objects row: %v", err) + } + if algo != "blake3" || checksum != "deadbeef" { + t.Fatalf("remote_objects fingerprint = (%q,%q), want (blake3,deadbeef)", algo, checksum) + } +} + +// assertContentsTriggersAbort checks the v21 schema-level immutability: +// an in-place UPDATE and a DELETE on a contents row both abort, while the +// row stays exactly as written. +func assertContentsTriggersAbort(t *testing.T, s *Store) { + t.Helper() + ctx := context.Background() + + if _, err := s.db.ExecContext(ctx, `UPDATE contents SET size_bytes = 999 WHERE id = 1`); err == nil { + t.Fatalf("UPDATE on contents succeeded, want trigger ABORT") + } + if _, err := s.db.ExecContext(ctx, `DELETE FROM contents WHERE id = 1`); err == nil { + t.Fatalf("DELETE on contents succeeded, want trigger ABORT") + } + + var size int64 + var count int + if err := s.db.QueryRowContext(ctx, `SELECT size_bytes FROM contents WHERE id = 1`).Scan(&size); err != nil { + t.Fatalf("contents row after refused mutations: %v", err) + } + if size != 10 { + t.Fatalf("size_bytes = %d after refused UPDATE, want 10", size) + } + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM contents`).Scan(&count); err != nil { + t.Fatalf("count contents: %v", err) + } + if count != 2 { + t.Fatalf("contents rows = %d after refused DELETE, want 2", count) + } +} diff --git a/sync/handler.go b/sync/handler.go index 7c8c23b..4c8b2eb 100644 --- a/sync/handler.go +++ b/sync/handler.go @@ -188,13 +188,18 @@ func finishHandlerRun(ctx context.Context, s *store.Store, rep *Report, runErr e // bucket transfer: BLAKE3 end-to-end when the integrity flags were in // force, rclone's size+mtime comparison otherwise. Only a fully // successful BLAKE3 run counts as verified. +// +// A run that asked for BLAKE3 but hit rclone's "no hashes in common" +// fallback is downgraded to size+mtime here even though the flags were +// set and rclone exited 0: rclone silently compared by size, so the copy +// was not content-verified and must not advance the durability vector. func rcloneVerification(dest *config.Destination, opts Options, rep *Report) VerifyResult { v := VerifyResult{ Method: VerifyMethodBlake3, Files: rep.RcloneResult.Transferred + rep.RcloneResult.Checked, Bytes: rep.RcloneResult.Bytes, } - if EffectiveShallow(dest, opts.Shallow) { + if EffectiveShallow(dest, opts.Shallow) || rep.RcloneResult.HashFallback { v.Method = VerifyMethodSizeMtime } v.verified = v.Method == VerifyMethodBlake3 && rep.Status == store.RunStatusSuccess diff --git a/sync/handler_test.go b/sync/handler_test.go index 7280b04..ba288a9 100644 --- a/sync/handler_test.go +++ b/sync/handler_test.go @@ -54,17 +54,22 @@ func TestRcloneVerification(t *testing.T) { dest *config.Destination opts Options status string + hashFallback bool wantVerified bool wantMethod string }{ - {"checksum success", plain, Options{}, store.RunStatusSuccess, true, VerifyMethodBlake3}, - {"checksum partial", plain, Options{}, store.RunStatusPartial, false, VerifyMethodBlake3}, - {"shallow success", plain, Options{Shallow: true}, store.RunStatusSuccess, false, VerifyMethodSizeMtime}, - {"crypt forces shallow", crypt, Options{}, store.RunStatusSuccess, false, VerifyMethodSizeMtime}, + {"checksum success", plain, Options{}, store.RunStatusSuccess, false, true, VerifyMethodBlake3}, + {"checksum partial", plain, Options{}, store.RunStatusPartial, false, false, VerifyMethodBlake3}, + {"shallow success", plain, Options{Shallow: true}, store.RunStatusSuccess, false, false, VerifyMethodSizeMtime}, + {"crypt forces shallow", crypt, Options{}, store.RunStatusSuccess, false, false, VerifyMethodSizeMtime}, + // rclone exited 0 with the integrity flags set, but reported the + // no-common-hash fallback: the copy was compared by size, so the + // result must be size+mtime and unverified. + {"hash fallback downgrades", plain, Options{}, store.RunStatusSuccess, true, false, VerifyMethodSizeMtime}, } for _, c := range cases { rep := &Report{Status: c.status} - rep.RcloneResult = RunResult{Transferred: 2, Checked: 3, Bytes: 42} + rep.RcloneResult = RunResult{Transferred: 2, Checked: 3, Bytes: 42, HashFallback: c.hashFallback} v := rcloneVerification(c.dest, c.opts, rep) if v.Verified() != c.wantVerified || v.Method != c.wantMethod { t.Errorf("%s: verified=%t method=%q, want %t %q", c.name, v.Verified(), v.Method, c.wantVerified, c.wantMethod) diff --git a/sync/kopia.go b/sync/kopia.go index 73e54c0..26fe2e4 100644 --- a/sync/kopia.go +++ b/sync/kopia.go @@ -80,18 +80,28 @@ func (k *Kopia) run(ctx context.Context, cfgFile, password string, args ...strin } // ensureRepository connects the destination-scoped config file to the -// filesystem repository at repoPath, creating the repository when -// connect reports nothing usable there (first use). Connect runs on -// every push so a repository path changed in squirrel's config is -// re-pointed rather than silently snapshotting into the old one. -// --no-persist-credentials keeps the password scoped to each -// invocation's environment; kopia's default would write it to a -// sidecar file next to the config on keyring-less hosts. -func (k *Kopia) ensureRepository(ctx context.Context, cfgFile, password, repoPath string) error { +// filesystem repository at repoPath. Connect runs on every push so a +// repository path changed in squirrel's config is re-pointed rather than +// silently snapshotting into the old one. --no-persist-credentials keeps +// the password scoped to each invocation's environment; kopia's default +// would write it to a sidecar file next to the config on keyring-less +// hosts. +// +// A connect failure creates the repository only when init is set. Without +// it, a failed connect is an error: creating on every connect failure +// would mint a fresh, empty repository on a transient outage or a +// mistyped path, and the destination's durability vector — monotonic and +// without a CLI to rewind — would keep claiming coverage the new +// repository cannot honour. init mirrors the --init gate the local +// destination marker uses for first-use bootstrap. +func (k *Kopia) ensureRepository(ctx context.Context, cfgFile, password, repoPath string, init bool) error { _, connectErr := k.run(ctx, cfgFile, password, "repository", "connect", "filesystem", "--path", repoPath, "--no-persist-credentials") if connectErr == nil { return nil } + if !init { + return fmt.Errorf("kopia repository at %s: connect failed (%w) — re-run with --init to create a new repository (refusing to auto-create in case the path is wrong or the destination is temporarily unreachable)", repoPath, connectErr) + } if _, createErr := k.run(ctx, cfgFile, password, "repository", "create", "filesystem", "--path", repoPath, "--no-persist-credentials"); createErr != nil { return fmt.Errorf("kopia repository at %s: connect failed (%w); create failed: %w", repoPath, connectErr, createErr) } @@ -197,7 +207,7 @@ func (h *kopiaHandler) Push(ctx context.Context, opts Options) (Report, error) { return rep, err } - err = h.snapshotAndVerify(ctx, &rep) + err = h.snapshotAndVerify(ctx, &rep, opts.Init) finishHandlerRun(ctx, h.store, &rep, err) // Local index snapshot only: the repository is kopia's own format, // so the rclone ride-along stays out of it (dest=nil, mirroring the @@ -237,12 +247,13 @@ func kopiaVerifyFilesPercent(dest *config.Destination) (float64, error) { // rep.Verification. Status starts failed and is promoted: success for a // clean verified snapshot, partial when the snapshot landed with // per-file errors kopia tolerated. Verified is reserved for the clean -// path — a snapshot with skipped files is durable but incomplete. -func (h *kopiaHandler) snapshotAndVerify(ctx context.Context, rep *Report) error { +// path — a snapshot with skipped files is durable but incomplete. init +// authorises first-use repository creation on a connect failure. +func (h *kopiaHandler) snapshotAndVerify(ctx context.Context, rep *Report, init bool) error { rep.Status = store.RunStatusFailed cfgFile := h.kopia.configFile(h.dest.Name) password := h.dest.Params["password"] - if err := h.kopia.ensureRepository(ctx, cfgFile, password, h.dest.Root); err != nil { + if err := h.kopia.ensureRepository(ctx, cfgFile, password, h.dest.Root, init); err != nil { return err } verifyFilesPercent, err := kopiaVerifyFilesPercent(h.dest) diff --git a/sync/kopia_test.go b/sync/kopia_test.go index 81495e3..e8e838d 100644 --- a/sync/kopia_test.go +++ b/sync/kopia_test.go @@ -221,7 +221,7 @@ func TestKopiaConnectFallsBackToCreate(t *testing.T) { t.Setenv("KOPIA_FAKE_CONNECT_EXIT", "1") f := setupKopiaFixture(t) - rep, err := RunPair(context.Background(), f.store, f.tools, f.pair, Options{}) + rep, err := RunPair(context.Background(), f.store, f.tools, f.pair, Options{Init: true}) if err != nil { t.Fatalf("RunPair: %v (rep=%+v)", err, rep) } @@ -231,13 +231,41 @@ func TestKopiaConnectFallsBackToCreate(t *testing.T) { } } +// TestKopiaConnectFailWithoutInitRefuses: a connect failure without --init +// is an error, not a silent re-create (#114). Auto-creating on every +// connect failure would mint a fresh empty repository on a transient +// outage or a mistyped path while the monotonic durability vector keeps +// claiming coverage the new repository cannot honour. +func TestKopiaConnectFailWithoutInitRefuses(t *testing.T) { + logPath := installFakeKopia(t) + t.Setenv("KOPIA_FAKE_CONNECT_EXIT", "1") + f := setupKopiaFixture(t) + + rep, err := RunPair(context.Background(), f.store, f.tools, f.pair, Options{}) + if err == nil { + t.Fatalf("expected connect-fail error without --init, got rep=%+v", rep) + } + if !strings.Contains(err.Error(), "--init") { + t.Fatalf("error should point at --init, got %v", err) + } + if rep.Status != store.RunStatusFailed { + t.Fatalf("Status = %q, want failed", rep.Status) + } + argv, _ := readCallLog(t, logPath) + for _, line := range argv { + if strings.HasPrefix(line, "repository create") { + t.Fatalf("repository was created despite no --init: argv = %q", argv) + } + } +} + func TestKopiaCreateFailureRecordsFailedRun(t *testing.T) { installFakeKopia(t) t.Setenv("KOPIA_FAKE_CONNECT_EXIT", "1") t.Setenv("KOPIA_FAKE_CREATE_EXIT", "1") f := setupKopiaFixture(t) - rep, err := RunPair(context.Background(), f.store, f.tools, f.pair, Options{}) + rep, err := RunPair(context.Background(), f.store, f.tools, f.pair, Options{Init: true}) if err == nil { t.Fatalf("expected error, got rep=%+v", rep) } @@ -356,7 +384,9 @@ func TestKopiaIntegrationRealBinary(t *testing.T) { } f := setupKopiaFixture(t) - rep, err := RunPair(context.Background(), f.store, f.tools, f.pair, Options{}) + // First push bootstraps a fresh repository, so it needs --init; the + // gate now refuses a silent re-create on connect failure. + rep, err := RunPair(context.Background(), f.store, f.tools, f.pair, Options{Init: true}) if err != nil { t.Fatalf("RunPair: %v (rep=%+v)", err, rep) } diff --git a/sync/rclone.go b/sync/rclone.go index c19e1cf..e1ad97b 100644 --- a/sync/rclone.go +++ b/sync/rclone.go @@ -232,6 +232,12 @@ type RunResult struct { // FatalError is true when the run failed in a way that did not produce // per-file errors — e.g. source root missing, auth failure. FatalError bool + // HashFallback is true when rclone reported that --checksum could not + // use the requested hash because source and destination share none, + // and silently fell back to a size-based comparison. A run that asked + // for BLAKE3 verification but hit this path was not content-verified, + // however rclone exited, so the caller must not record it as verified. + HashFallback bool } // FailedFile is one per-object error from the JSON log. Object may be @@ -474,6 +480,16 @@ var retrySummaryRE = regexp.MustCompile(`^Attempt \d+/\d+ failed`) func isRetrySummary(msg string) bool { return retrySummaryRE.MatchString(msg) } +// hashFallbackRE matches rclone's notice that --checksum has no common +// hash to compare with and is degrading to a size-based check, e.g. +// "--checksum is in use but the source and destination have no hashes in +// common; falling back to --size-only". The trailing verb has varied +// across rclone versions ("falling back"/"failing back") so the match +// keys on the stable phrase "no hashes in common", at any log level. +var hashFallbackRE = regexp.MustCompile(`no hashes in common`) + +func isHashFallback(msg string) bool { return hashFallbackRE.MatchString(msg) } + // parseJSONLog reads JSON-per-line events from r and updates result in // place. Non-JSON lines (e.g. an early startup notice on an older rclone) // are skipped — we cannot make decisions on them and surfacing them as @@ -491,6 +507,12 @@ func parseJSONLog(r io.Reader, result *RunResult, onProgress func(runevents.Prog if err := json.Unmarshal(line, &ev); err != nil { continue } + if isHashFallback(ev.Msg) { + // Emitted at NOTICE level (which the level filter below drops), + // so it is detected here before that filter: a run that asked + // for BLAKE3 but lost the hash must not be recorded as verified. + result.HashFallback = true + } if ev.Stats != nil { result.Transferred = ev.Stats.TotalTransfers result.Checked = ev.Stats.TotalChecks diff --git a/sync/rclone_test.go b/sync/rclone_test.go index 647c08a..49a3c23 100644 --- a/sync/rclone_test.go +++ b/sync/rclone_test.go @@ -72,6 +72,37 @@ func TestParseJSONLogCapturesObjectlessErrors(t *testing.T) { } } +// TestParseJSONLogDetectsHashFallback: rclone's no-common-hash notice is +// emitted at NOTICE level, which the error filter drops; parseJSONLog +// still flags it so a flags-set, exit-0 run that silently degraded to a +// size comparison is not later recorded as content-verified. +func TestParseJSONLogDetectsHashFallback(t *testing.T) { + stream := strings.Join([]string{ + `{"level":"notice","msg":"--checksum is in use but the source and destination have no hashes in common; falling back to --size-only","source":"x"}`, + `{"stats":{"errors":0,"fatalError":false,"totalTransfers":2,"totalChecks":0,"bytes":10}}`, + }, "\n") + var r RunResult + parseJSONLog(strings.NewReader(stream), &r, nil) + + if !r.HashFallback { + t.Fatalf("HashFallback = false, want true (no-common-hash notice should be detected)") + } + if len(r.FailedFiles) != 0 { + t.Fatalf("FailedFiles = %+v, want none (the notice is not a per-file error)", r.FailedFiles) + } +} + +// TestParseJSONLogNoFalseHashFallback: an ordinary run never trips the +// fallback flag. +func TestParseJSONLogNoFalseHashFallback(t *testing.T) { + stream := `{"stats":{"errors":0,"fatalError":false,"totalTransfers":2,"totalChecks":1,"bytes":10}}` + var r RunResult + parseJSONLog(strings.NewReader(stream), &r, nil) + if r.HashFallback { + t.Fatalf("HashFallback = true on a clean run, want false") + } +} + func TestIsRetrySummary(t *testing.T) { cases := []struct { in string diff --git a/sync/sync.go b/sync/sync.go index 1e5ad57..e00d057 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -382,6 +382,13 @@ func runRcloneOperation( // least one success-or-partial index run exists for it. Sync of an // unindexed volume is refused: without an index, we have no record of // what should be at the destination after the run. +// +// The DB row's recorded path must equal the config-declared path. A +// handler enumerates the config path's tree while the durability advance +// covers the rows the DB volume holds; if the two paths disagree (a stale +// volumes.path) the push would claim durability for one tree while +// transferring another. Offload and restore already make this +// cross-check; the push handlers share it through this gate. func requireIndexedVolume(ctx context.Context, s *store.Store, vol *config.Volume) (int64, error) { v, err := s.GetVolumeByName(ctx, vol.Name) if err != nil { @@ -390,6 +397,9 @@ func requireIndexedVolume(ctx context.Context, s *store.Store, vol *config.Volum } return 0, fmt.Errorf("lookup volume %q: %w", vol.Name, err) } + if v.Path != vol.Path { + return 0, fmt.Errorf("volume %q is at %q in the DB but config says %q — resolve the conflict before syncing", vol.Name, v.Path, vol.Path) + } if _, err := s.LatestSuccessfulIndexRun(ctx, v.ID); err != nil { if store.IsNotFound(err) { return 0, fmt.Errorf("volume %q has no successful index run — run `squirrel index %s` first", vol.Name, vol.Name) diff --git a/sync/sync_test.go b/sync/sync_test.go index ce7ee90..a035564 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -108,6 +108,24 @@ func TestSyncRequiresIndexedVolume(t *testing.T) { } } +// TestSyncRefusesOnVolumePathMismatch mirrors the restore and offload +// cross-checks (#114): a DB volumes.path that no longer matches the +// config-declared path makes the push handler refuse, so it cannot push +// one tree while the durability advance covers another. +func TestSyncRefusesOnVolumePathMismatch(t *testing.T) { + f := setupFixture(t) + // Seed the volume row with a path that differs from f.vol.Path so the + // shared requireIndexedVolume gate fails before rclone is invoked. + staleDir := t.TempDir() + if _, err := f.store.CreateVolume(context.Background(), f.vol.Name, staleDir); err != nil { + t.Fatalf("seed stale volume row: %v", err) + } + _, err := Sync(context.Background(), f.store, f.rcl, f.vol, f.dest, Options{}) + if err == nil || !strings.Contains(err.Error(), "resolve the conflict") { + t.Fatalf("expected path-mismatch error, got %v", err) + } +} + func TestSyncHappyPath(t *testing.T) { f := setupFixture(t) if err := os.WriteFile(filepath.Join(f.vol.Path, "a.txt"), []byte("alpha"), 0o644); err != nil {