Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions offload/durability_soundness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,42 @@ func TestOffloadPresenceSizeUnverifiedFingerprintHeldOut(t *testing.T) {
mustExist(t, filepath.Join(root, "a.txt"))
}

// TestOffloadGateNamesPeerProvenance: a stale component a durability pull
// asserted names the asserting peer in the gate's refusal — the audit
// trail answers "which evidence (and from which peer) gated this offload"
// (the residual of #104). A locally-verified component reads "locally
// verified" instead.
func TestOffloadGateNamesPeerProvenance(t *testing.T) {
root := t.TempDir()
writeFile(t, filepath.Join(root, "a.txt"), "alpha")
s := setupStore(t)
ctx := context.Background()
idx := indexVolume(t, s, root)
v := testVolume(t, s)
self := selfNode(t, s)

peer, err := s.GetOrCreateOriginNode(ctx, "nas")
if err != nil {
t.Fatalf("GetOrCreateOriginNode(nas): %v", err)
}
if err := s.UpsertDestinationRunIDPulled(ctx, v.ID, "t1", self.ID, idx.RunID-1, store.VerifyMethodBlake3, peer.ID, false); err != nil {
t.Fatalf("UpsertDestinationRunIDPulled: %v", err)
}
recordPush(t, s, v.ID, "t1")

rep, err := Offload(ctx, s, root, Options{
Name: volName, Paths: []string{"."}, Require: []string{"t1"},
})
if err != nil {
t.Fatalf("Offload: %v", err)
}
res := oneResult(t, rep, "a.txt", OutcomeNotDurable)
if len(res.Reasons) != 1 || !strings.Contains(res.Reasons[0], "asserted by peer nas") {
t.Fatalf("reasons = %v, want the stale failure naming the asserting peer nas", res.Reasons)
}
mustExist(t, filepath.Join(root, "a.txt"))
}

// TestOffloadContentVerifiedMethodsGate: blake3, peer-blake3, and
// kopia-verify components each gate on their own (no fingerprint needed)
// once the vector and freshness conditions hold — the stricter gate does
Expand Down
30 changes: 24 additions & 6 deletions offload/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ import (
)

// component is one loaded durability-vector entry: the highest origin
// run covered for an origin node, plus the verification method that
// advanced it. The method lets the gate refuse a presence-only
// component that has no content verification behind it.
// run covered for an origin node, the verification method that advanced
// it, and the provenance of that advance. The method lets the gate
// refuse a presence-only component that has no content verification
// behind it; source is the asserting peer's node id when the component
// was last advanced by a durability pull, and invalid (NULL) for the
// locally-verified class — so the gate's decision names which peer's
// evidence backed it.
type component struct {
coveredRun int64
method string
source sql.NullInt64
}

// gate is the offline durability evidence for one invocation: the self
Expand Down Expand Up @@ -60,7 +65,7 @@ func loadGate(ctx context.Context, s *store.Store, volumeID int64, require []str
}
vector := make(map[int64]component, len(components))
for _, c := range components {
vector[c.OriginNodeID] = component{coveredRun: c.OriginRunID, method: c.VerifyMethod}
vector[c.OriginNodeID] = component{coveredRun: c.OriginRunID, method: c.VerifyMethod, source: c.SourceNodeID}
}
g.vectors[target] = vector

Expand Down Expand Up @@ -115,7 +120,7 @@ func (g *gate) check(ctx context.Context, row store.FileRow) ([]string, error) {
continue
case comp.coveredRun < originRun:
failures = append(failures,
fmt.Sprintf("%s: stale: have %d need %d (origin %s)", target, comp.coveredRun, originRun, g.nodeName(ctx, originNode)))
fmt.Sprintf("%s: stale: have %d need %d (origin %s, %s)", target, comp.coveredRun, originRun, g.nodeName(ctx, originNode), g.provenance(ctx, comp)))
continue
}
if reason := g.freshnessFailure(ctx, target, row, originNode, originRun); reason != "" {
Expand All @@ -128,7 +133,7 @@ func (g *gate) check(ctx context.Context, row store.FileRow) ([]string, error) {
}
if !verified {
failures = append(failures,
fmt.Sprintf("%s: not content-verified (method %q); a verified fingerprint must back the object before offload", target, displayMethod(comp.method)))
fmt.Sprintf("%s: not content-verified (method %q, %s); a verified fingerprint must back the object before offload", target, displayMethod(comp.method), g.provenance(ctx, comp)))
}
}
return failures, nil
Expand Down Expand Up @@ -230,6 +235,19 @@ func (g *gate) origin(ctx context.Context, row store.FileRow) (int64, int64, err
return g.self.ID, intro, nil
}

// provenance renders a component's evidence class for a gate decision:
// "locally verified" for a component this node advanced itself, or
// "asserted by peer <name>" for one a durability pull tagged. The audit
// trail thus names which peer's evidence gated (or failed to gate) an
// offload, and a peer whose assertions later prove untrustworthy is
// identifiable per decision.
func (g *gate) provenance(ctx context.Context, comp component) string {
if !comp.source.Valid {
return "locally verified"
}
return fmt.Sprintf("asserted by peer %s", g.nodeName(ctx, comp.source.Int64))
}

// nodeName resolves an origin node id to its name for the failure
// messages, cached per invocation. A lookup failure degrades to the
// numeric id — the gate decision is already made, naming is cosmetic.
Expand Down
4 changes: 2 additions & 2 deletions offload/offload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ func TestOffloadPeerOriginContent(t *testing.T) {
}
oneResult(t, rep, "covered.txt", OutcomeOffloaded)
res := oneResult(t, rep, "ahead.txt", OutcomeNotDurable)
if !strings.Contains(res.Reasons[0], "t1: stale: have 7 need 9 (origin peer1)") {
t.Fatalf("reasons = %v, want stale failure naming origin peer1", res.Reasons)
if !strings.Contains(res.Reasons[0], "t1: stale: have 7 need 9 (origin peer1, locally verified)") {
t.Fatalf("reasons = %v, want stale failure naming origin peer1 and local provenance", res.Reasons)
}
mustBeGone(t, filepath.Join(root, "covered.txt"))
mustExist(t, filepath.Join(root, "ahead.txt"))
Expand Down
99 changes: 78 additions & 21 deletions store/destination_run_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,29 @@ func KnownVerifyMethod(method string) bool {
// runs FK. Content with origin (N, r) is durable on a destination iff
// the vector's component for N is ≥ r. VerifyMethod names the comparison
// that last advanced the component (empty for a pre-v19 row).
//
// SourceNodeID is the provenance class: NULL when the component was
// advanced by a transfer this node observed itself (the locally-verified
// class, via AdvanceDestinationVectorTo), and the asserting peer's node
// id when a durability pull last advanced it. The offload gate weighs a
// peer-asserted component as a distinct, revocable class.
type DestinationRunID struct {
VolumeID int64
Destination string
OriginNodeID int64
OriginRunID int64
UpdatedAtNs int64
VerifyMethod string
SourceNodeID sql.NullInt64
}

// DestinationRunIDHistory is one row of the insert-only
// destination_run_ids_history log: a single vector-component advance.
// AtNs is the insertion timestamp; rows written in the same tick still
// order by id. VerifyMethod records the method behind this advance.
// SourceNodeID records the advance's provenance (NULL locally-verified,
// else the asserting peer) so the audit log traces revocation, not just
// the live vector.
type DestinationRunIDHistory struct {
ID int64
VolumeID int64
Expand All @@ -96,6 +106,7 @@ type DestinationRunIDHistory struct {
OriginRunID int64
AtNs int64
VerifyMethod string
SourceNodeID sql.NullInt64
}

// DestinationRewindError carries the rejected and current vector
Expand Down Expand Up @@ -123,7 +134,7 @@ func (e *DestinationRewindError) Unwrap() error { return ErrWatermarkRewind }
// run id advances from it.
func (s *Store) GetDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID int64) (DestinationRunID, error) {
row := s.db.QueryRowContext(ctx,
`SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method
`SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id
FROM destination_run_ids
WHERE volume_id = ? AND destination = ? AND origin_node_id = ?`,
volumeID, destination, originNodeID)
Expand All @@ -135,7 +146,7 @@ func (s *Store) GetDestinationRunID(ctx context.Context, volumeID int64, destina
// means the destination has no recorded durability yet.
func (s *Store) ListDestinationRunIDs(ctx context.Context, volumeID int64, destination string) ([]DestinationRunID, error) {
return queryRows(ctx, s.db,
`SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method
`SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id
FROM destination_run_ids
WHERE volume_id = ? AND destination = ?
ORDER BY origin_node_id`,
Expand All @@ -149,7 +160,7 @@ func (s *Store) ListDestinationRunIDs(ctx context.Context, volumeID int64, desti
// can see.
func (s *Store) ListVolumeDestinationRunIDs(ctx context.Context, volumeID int64) ([]DestinationRunID, error) {
return queryRows(ctx, s.db,
`SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method
`SELECT volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id
FROM destination_run_ids
WHERE volume_id = ?
ORDER BY destination, origin_node_id`,
Expand All @@ -159,7 +170,7 @@ func (s *Store) ListVolumeDestinationRunIDs(ctx context.Context, volumeID int64)
func scanDestinationRunID(s rowScanner) (DestinationRunID, error) {
var d DestinationRunID
var method sql.NullString
err := s.Scan(&d.VolumeID, &d.Destination, &d.OriginNodeID, &d.OriginRunID, &d.UpdatedAtNs, &method)
err := s.Scan(&d.VolumeID, &d.Destination, &d.OriginNodeID, &d.OriginRunID, &d.UpdatedAtNs, &method, &d.SourceNodeID)
d.VerifyMethod = method.String
return d, err
}
Expand All @@ -177,7 +188,7 @@ func scanDestinationRunID(s rowScanner) (DestinationRunID, error) {
// use rather than writing components directly.
func (s *Store) AdvanceDestinationVectorTo(ctx context.Context, volumeID int64, destination, verifyMethod string, components []OriginComponent) error {
for _, c := range components {
err := s.upsertDestinationRunID(ctx, volumeID, destination, c.OriginNodeID, c.OriginRunID, verifyMethod, false)
err := s.upsertDestinationRunID(ctx, volumeID, destination, c.OriginNodeID, c.OriginRunID, verifyMethod, sql.NullInt64{}, false)
if errors.Is(err, ErrWatermarkRewind) {
continue
}
Expand Down Expand Up @@ -285,19 +296,37 @@ func scanOriginComponent(s rowScanner) (OriginComponent, error) {
// is unchanged, so a methodless re-confirmation (e.g. a pull from a
// pre-v19 peer) never degrades a content-verified component to unknown.
func (s *Store) UpsertDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, allowRewind bool) error {
return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, "", allowRewind)
return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, "", sql.NullInt64{}, allowRewind)
}

// UpsertDestinationRunIDVerified is UpsertDestinationRunID with an
// explicit verification method recorded on the component — the entry
// point the durability pull uses to carry a peer's reported method
// verbatim, so the puller's offload gate weighs a pulled component
// exactly as the responder did.
// UpsertDestinationRunIDVerified advances a component this node verified
// itself, recording verifyMethod and leaving source_node_id NULL (the
// locally-verified provenance class). The durability pull does not use
// it — peer-asserted advances go through UpsertDestinationRunIDPulled so
// the two classes stay distinguishable and a peer's assertions revocable.
func (s *Store) UpsertDestinationRunIDVerified(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, allowRewind bool) error {
return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, verifyMethod, allowRewind)
return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, verifyMethod, sql.NullInt64{}, allowRewind)
}

// UpsertDestinationRunIDPulled records a peer-pulled advance: it carries
// the origin's reported verify method verbatim (so the puller's offload
// gate weighs the component exactly as the responder did) and tags the
// component with sourceNodeID, the asserting peer. This is the only
// entry point that stamps non-local provenance; every locally-verified
// path leaves source_node_id NULL, so revocation can drop a peer's
// assertions without touching evidence this node observed itself.
func (s *Store) UpsertDestinationRunIDPulled(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, sourceNodeID int64, allowRewind bool) error {
return s.upsertDestinationRunID(ctx, volumeID, destination, originNodeID, originRunID, verifyMethod, sql.NullInt64{Int64: sourceNodeID, Valid: true}, allowRewind)
}

func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, allowRewind bool) error {
// upsertDestinationRunID is the shared advance. sourceNodeID carries
// provenance: an invalid (NULL) value is the locally-verified class; a
// valid value is the asserting peer. source_node_id follows the run: a
// strict advance adopts the incoming provenance, an incoming local (NULL)
// re-confirmation upgrades a peer-tagged component back to local, and a
// peer re-confirmation at the recorded run never downgrades a
// locally-verified (NULL) component to peer-asserted.
func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, destination string, originNodeID, originRunID int64, verifyMethod string, sourceNodeID sql.NullInt64, allowRewind bool) error {
if destination == "" {
return fmt.Errorf("UpsertDestinationRunID: destination must be non-empty")
}
Expand All @@ -310,18 +339,23 @@ func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, dest
atNs := NowNs()
method := nullableString(verifyMethod)
res, err := tx.ExecContext(ctx, `
INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method)
VALUES (?, ?, ?, ?, ?, ?)
INSERT INTO destination_run_ids (volume_id, destination, origin_node_id, origin_run_id, updated_at_ns, verify_method, source_node_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(volume_id, destination, origin_node_id) DO UPDATE SET
origin_run_id = excluded.origin_run_id,
updated_at_ns = excluded.updated_at_ns,
verify_method = CASE
WHEN excluded.verify_method IS NOT NULL THEN excluded.verify_method
WHEN excluded.origin_run_id > destination_run_ids.origin_run_id THEN NULL
ELSE destination_run_ids.verify_method
END,
source_node_id = CASE
WHEN excluded.origin_run_id > destination_run_ids.origin_run_id THEN excluded.source_node_id
WHEN excluded.source_node_id IS NULL THEN NULL
ELSE destination_run_ids.source_node_id
END
WHERE excluded.origin_run_id >= destination_run_ids.origin_run_id OR ?
`, volumeID, destination, originNodeID, originRunID, atNs, method, allowRewind)
`, volumeID, destination, originNodeID, originRunID, atNs, method, sourceNodeID, allowRewind)
if err != nil {
return fmt.Errorf("upsert destination_run_ids: %w", err)
}
Expand All @@ -337,9 +371,9 @@ func (s *Store) upsertDestinationRunID(ctx context.Context, volumeID int64, dest
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO destination_run_ids_history
(volume_id, destination, origin_node_id, origin_run_id, at_ns, verify_method)
VALUES (?, ?, ?, ?, ?, ?)
`, volumeID, destination, originNodeID, originRunID, atNs, method); err != nil {
(volume_id, destination, origin_node_id, origin_run_id, at_ns, verify_method, source_node_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, volumeID, destination, originNodeID, originRunID, atNs, method, sourceNodeID); err != nil {
return fmt.Errorf("append destination_run_ids_history: %w", err)
}
if err := tx.Commit(); err != nil {
Expand Down Expand Up @@ -388,7 +422,7 @@ func guardDestinationMonotonicTx(ctx context.Context, tx *sql.Tx, volumeID int64
// recorded advances.
func (s *Store) ListDestinationRunIDHistory(ctx context.Context, volumeID int64, destination string) ([]DestinationRunIDHistory, error) {
return queryRows(ctx, s.db, `
SELECT id, volume_id, destination, origin_node_id, origin_run_id, at_ns, verify_method
SELECT id, volume_id, destination, origin_node_id, origin_run_id, at_ns, verify_method, source_node_id
FROM destination_run_ids_history
WHERE volume_id = ? AND destination = ?
ORDER BY id
Expand All @@ -398,7 +432,30 @@ func (s *Store) ListDestinationRunIDHistory(ctx context.Context, volumeID int64,
func scanDestinationRunIDHistory(s rowScanner) (DestinationRunIDHistory, error) {
var h DestinationRunIDHistory
var method sql.NullString
err := s.Scan(&h.ID, &h.VolumeID, &h.Destination, &h.OriginNodeID, &h.OriginRunID, &h.AtNs, &method)
err := s.Scan(&h.ID, &h.VolumeID, &h.Destination, &h.OriginNodeID, &h.OriginRunID, &h.AtNs, &method, &h.SourceNodeID)
h.VerifyMethod = method.String
return h, err
}

// RevokeDestinationRunIDsFromSource removes the live durability-vector
// components a single peer asserted, returning how many it deleted. It is
// the operator's recovery path for a compromised or mistaken peer: pulled
// evidence (source_node_id = sourceNodeID) is dropped while
// locally-verified components (source_node_id NULL) and other peers'
// assertions stay, so the live verified vector is never rewound. The
// append-only destination_run_ids_history is untouched — the assertions
// remain in the audit trail, so revocation is a forward act, not a
// rewrite of history. A revoked component reverts to "no row" (no floor);
// a later legitimate pull or verified push re-advances it.
func (s *Store) RevokeDestinationRunIDsFromSource(ctx context.Context, sourceNodeID int64) (int64, error) {
res, err := s.db.ExecContext(ctx,
`DELETE FROM destination_run_ids WHERE source_node_id = ?`, sourceNodeID)
if err != nil {
return 0, fmt.Errorf("revoke destination_run_ids from source %d: %w", sourceNodeID, err)
}
n, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("revoke destination_run_ids rows: %w", err)
}
return n, nil
}
Loading
Loading