From eccf0c66a4c315a10d5f81b1c42eb0bb6e8d10ed Mon Sep 17 00:00:00 2001 From: Krutika Dhananjay Date: Wed, 11 Feb 2026 16:49:27 +0530 Subject: [PATCH] Introducing FilterReferencedSnapshots helper function Add FilterReferencedSnapshots() to allow catalog implementations to filter out referenced snapshot IDs from RemoveSnapshots updates before applying them during commit. This closes a race window between snapshot expiration and concurrent client commits that add refs to snapshots targeted for expiration. The filter accounts for RemoveSnapshotRef updates in the same batch, so normal expiration flows (remove ref + remove snapshot) are unaffected. --- table/updates.go | 48 +++++++++++++++++++ table/updates_test.go | 109 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/table/updates.go b/table/updates.go index ad302a29e..f4035ff4e 100644 --- a/table/updates.go +++ b/table/updates.go @@ -560,3 +560,51 @@ func NewRemoveSchemasUpdate(schemaIds []int) *removeSchemasUpdate { func (u *removeSchemasUpdate) Apply(builder *MetadataBuilder) error { return builder.RemoveSchemas(u.SchemaIDs) } + +// FilterReferencedSnapshots removes referenced snapshot IDs from any +// RemoveSnapshots updates. Snapshots referenced by a branch or tag in +// the given metadata are excluded, unless the same update batch also +// removes that ref. Updates with no remaining snapshot IDs are dropped +// entirely. +func FilterReferencedSnapshots(updates []Update, meta Metadata) []Update { + // Build ref state: ref name → snapshot ID + refsByName := make(map[string]int64) + for name, ref := range meta.Refs() { + refsByName[name] = ref.SnapshotID + } + + // Apply ref removals from the same update batch + for _, upd := range updates { + if rs, ok := upd.(*removeSnapshotRefUpdate); ok { + delete(refsByName, rs.RefName) + } + } + + // Build the effective referenced set after accounting for ref removals + referenced := make(map[int64]bool) + for _, snapID := range refsByName { + referenced[snapID] = true + } + + // Filter RemoveSnapshots updates + result := make([]Update, 0, len(updates)) + for _, upd := range updates { + rs, ok := upd.(*removeSnapshotsUpdate) + if !ok { + result = append(result, upd) + continue + } + + filtered := make([]int64, 0, len(rs.SnapshotIDs)) + for _, id := range rs.SnapshotIDs { + if !referenced[id] { + filtered = append(filtered, id) + } + } + if len(filtered) > 0 { + result = append(result, NewRemoveSnapshotsUpdate(filtered)) + } + } + + return result +} diff --git a/table/updates_test.go b/table/updates_test.go index 831abb4b6..1351aaa2e 100644 --- a/table/updates_test.go +++ b/table/updates_test.go @@ -244,3 +244,112 @@ func TestUnmarshalUpdates(t *testing.T) { }) } } + +// buildMetadataWithSnapshots creates table metadata with the given snapshots +// and refs for use in FilterReferencedSnapshots tests. +func buildMetadataWithSnapshots(t *testing.T, snapshots []Snapshot, refs map[string]SnapshotRef) Metadata { + t.Helper() + builder := builderWithoutChanges(2) + baseTs := builder.base.LastUpdatedMillis() + for i := range snapshots { + snapshots[i].TimestampMs = baseTs + int64(i) + 1 + require.NoError(t, builder.AddSnapshot(&snapshots[i])) + } + for name, ref := range refs { + require.NoError(t, builder.SetSnapshotRef(name, ref.SnapshotID, ref.SnapshotRefType)) + } + meta, err := builder.Build() + require.NoError(t, err) + return meta +} + +func TestFilterReferencedSnapshots(t *testing.T) { + schemaID := 0 + snap := func(id int64) Snapshot { + return Snapshot{ + SnapshotID: id, + ManifestList: "/snap.avro", + Summary: &Summary{Operation: OpAppend}, + SchemaID: &schemaID, + } + } + + t.Run("skips referenced snapshots", func(t *testing.T) { + // snap-1 referenced by feature-branch, snap-2 unreferenced, snap-3 referenced by main + meta := buildMetadataWithSnapshots(t, + []Snapshot{snap(1), snap(2), snap(3)}, + map[string]SnapshotRef{ + "feature-branch": {SnapshotID: 1, SnapshotRefType: BranchRef}, + MainBranch: {SnapshotID: 3, SnapshotRefType: BranchRef}, + }, + ) + + updates := []Update{ + NewRemoveSnapshotsUpdate([]int64{1, 2}), + } + + filtered := FilterReferencedSnapshots(updates, meta) + require.Len(t, filtered, 1) + rs := filtered[0].(*removeSnapshotsUpdate) + require.Equal(t, []int64{2}, rs.SnapshotIDs) + }) + + t.Run("allows removal when ref is also removed in same batch", func(t *testing.T) { + // tag-1 → snap-1, main → snap-2 + meta := buildMetadataWithSnapshots(t, + []Snapshot{snap(1), snap(2)}, + map[string]SnapshotRef{ + "tag-1": {SnapshotID: 1, SnapshotRefType: TagRef}, + MainBranch: {SnapshotID: 2, SnapshotRefType: BranchRef}, + }, + ) + + updates := []Update{ + NewRemoveSnapshotRefUpdate("tag-1"), + NewRemoveSnapshotsUpdate([]int64{1}), + } + + filtered := FilterReferencedSnapshots(updates, meta) + // RemoveSnapshotRef passes through, RemoveSnapshots keeps snap-1 + require.Len(t, filtered, 2) + require.Equal(t, UpdateRemoveSnapshotRef, filtered[0].Action()) + rs := filtered[1].(*removeSnapshotsUpdate) + require.Equal(t, []int64{1}, rs.SnapshotIDs) + }) + + t.Run("drops update entirely when all snapshots are referenced", func(t *testing.T) { + meta := buildMetadataWithSnapshots(t, + []Snapshot{snap(1), snap(2)}, + map[string]SnapshotRef{ + "branch-a": {SnapshotID: 1, SnapshotRefType: BranchRef}, + "branch-b": {SnapshotID: 2, SnapshotRefType: BranchRef}, + }, + ) + + updates := []Update{ + NewRemoveSnapshotsUpdate([]int64{1, 2}), + } + + filtered := FilterReferencedSnapshots(updates, meta) + require.Len(t, filtered, 0) + }) + + t.Run("passes through non-RemoveSnapshots updates unchanged", func(t *testing.T) { + meta := buildMetadataWithSnapshots(t, + []Snapshot{snap(1)}, + map[string]SnapshotRef{ + MainBranch: {SnapshotID: 1, SnapshotRefType: BranchRef}, + }, + ) + + updates := []Update{ + NewSetPropertiesUpdate(iceberg.Properties{"key": "value"}), + NewRemovePropertiesUpdate([]string{"old-key"}), + } + + filtered := FilterReferencedSnapshots(updates, meta) + require.Len(t, filtered, 2) + require.Equal(t, UpdateSetProperties, filtered[0].Action()) + require.Equal(t, UpdateRemoveProperties, filtered[1].Action()) + }) +}