diff --git a/table/updates.go b/table/updates.go index ad302a29e..f4035ff4e 100644 --- a/table/updates.go +++ b/table/updates.go @@ -560,3 +560,51 @@ func NewRemoveSchemasUpdate(schemaIds []int) *removeSchemasUpdate { func (u *removeSchemasUpdate) Apply(builder *MetadataBuilder) error { return builder.RemoveSchemas(u.SchemaIDs) } + +// FilterReferencedSnapshots removes referenced snapshot IDs from any +// RemoveSnapshots updates. Snapshots referenced by a branch or tag in +// the given metadata are excluded, unless the same update batch also +// removes that ref. Updates with no remaining snapshot IDs are dropped +// entirely. +func FilterReferencedSnapshots(updates []Update, meta Metadata) []Update { + // Build ref state: ref name → snapshot ID + refsByName := make(map[string]int64) + for name, ref := range meta.Refs() { + refsByName[name] = ref.SnapshotID + } + + // Apply ref removals from the same update batch + for _, upd := range updates { + if rs, ok := upd.(*removeSnapshotRefUpdate); ok { + delete(refsByName, rs.RefName) + } + } + + // Build the effective referenced set after accounting for ref removals + referenced := make(map[int64]bool) + for _, snapID := range refsByName { + referenced[snapID] = true + } + + // Filter RemoveSnapshots updates + result := make([]Update, 0, len(updates)) + for _, upd := range updates { + rs, ok := upd.(*removeSnapshotsUpdate) + if !ok { + result = append(result, upd) + continue + } + + filtered := make([]int64, 0, len(rs.SnapshotIDs)) + for _, id := range rs.SnapshotIDs { + if !referenced[id] { + filtered = append(filtered, id) + } + } + if len(filtered) > 0 { + result = append(result, NewRemoveSnapshotsUpdate(filtered)) + } + } + + return result +} diff --git a/table/updates_test.go b/table/updates_test.go index 831abb4b6..1351aaa2e 100644 --- a/table/updates_test.go +++ b/table/updates_test.go @@ -244,3 +244,112 @@ func TestUnmarshalUpdates(t *testing.T) { }) } } + +// buildMetadataWithSnapshots creates table metadata with the given snapshots +// and refs for use in FilterReferencedSnapshots tests. +func buildMetadataWithSnapshots(t *testing.T, snapshots []Snapshot, refs map[string]SnapshotRef) Metadata { + t.Helper() + builder := builderWithoutChanges(2) + baseTs := builder.base.LastUpdatedMillis() + for i := range snapshots { + snapshots[i].TimestampMs = baseTs + int64(i) + 1 + require.NoError(t, builder.AddSnapshot(&snapshots[i])) + } + for name, ref := range refs { + require.NoError(t, builder.SetSnapshotRef(name, ref.SnapshotID, ref.SnapshotRefType)) + } + meta, err := builder.Build() + require.NoError(t, err) + return meta +} + +func TestFilterReferencedSnapshots(t *testing.T) { + schemaID := 0 + snap := func(id int64) Snapshot { + return Snapshot{ + SnapshotID: id, + ManifestList: "/snap.avro", + Summary: &Summary{Operation: OpAppend}, + SchemaID: &schemaID, + } + } + + t.Run("skips referenced snapshots", func(t *testing.T) { + // snap-1 referenced by feature-branch, snap-2 unreferenced, snap-3 referenced by main + meta := buildMetadataWithSnapshots(t, + []Snapshot{snap(1), snap(2), snap(3)}, + map[string]SnapshotRef{ + "feature-branch": {SnapshotID: 1, SnapshotRefType: BranchRef}, + MainBranch: {SnapshotID: 3, SnapshotRefType: BranchRef}, + }, + ) + + updates := []Update{ + NewRemoveSnapshotsUpdate([]int64{1, 2}), + } + + filtered := FilterReferencedSnapshots(updates, meta) + require.Len(t, filtered, 1) + rs := filtered[0].(*removeSnapshotsUpdate) + require.Equal(t, []int64{2}, rs.SnapshotIDs) + }) + + t.Run("allows removal when ref is also removed in same batch", func(t *testing.T) { + // tag-1 → snap-1, main → snap-2 + meta := buildMetadataWithSnapshots(t, + []Snapshot{snap(1), snap(2)}, + map[string]SnapshotRef{ + "tag-1": {SnapshotID: 1, SnapshotRefType: TagRef}, + MainBranch: {SnapshotID: 2, SnapshotRefType: BranchRef}, + }, + ) + + updates := []Update{ + NewRemoveSnapshotRefUpdate("tag-1"), + NewRemoveSnapshotsUpdate([]int64{1}), + } + + filtered := FilterReferencedSnapshots(updates, meta) + // RemoveSnapshotRef passes through, RemoveSnapshots keeps snap-1 + require.Len(t, filtered, 2) + require.Equal(t, UpdateRemoveSnapshotRef, filtered[0].Action()) + rs := filtered[1].(*removeSnapshotsUpdate) + require.Equal(t, []int64{1}, rs.SnapshotIDs) + }) + + t.Run("drops update entirely when all snapshots are referenced", func(t *testing.T) { + meta := buildMetadataWithSnapshots(t, + []Snapshot{snap(1), snap(2)}, + map[string]SnapshotRef{ + "branch-a": {SnapshotID: 1, SnapshotRefType: BranchRef}, + "branch-b": {SnapshotID: 2, SnapshotRefType: BranchRef}, + }, + ) + + updates := []Update{ + NewRemoveSnapshotsUpdate([]int64{1, 2}), + } + + filtered := FilterReferencedSnapshots(updates, meta) + require.Len(t, filtered, 0) + }) + + t.Run("passes through non-RemoveSnapshots updates unchanged", func(t *testing.T) { + meta := buildMetadataWithSnapshots(t, + []Snapshot{snap(1)}, + map[string]SnapshotRef{ + MainBranch: {SnapshotID: 1, SnapshotRefType: BranchRef}, + }, + ) + + updates := []Update{ + NewSetPropertiesUpdate(iceberg.Properties{"key": "value"}), + NewRemovePropertiesUpdate([]string{"old-key"}), + } + + filtered := FilterReferencedSnapshots(updates, meta) + require.Len(t, filtered, 2) + require.Equal(t, UpdateSetProperties, filtered[0].Action()) + require.Equal(t, UpdateRemoveProperties, filtered[1].Action()) + }) +}