Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions table/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,51 @@
func (u *removeSchemasUpdate) Apply(builder *MetadataBuilder) error {
return builder.RemoveSchemas(u.SchemaIDs)
}

// FilterReferencedSnapshots removes referenced snapshot IDs from any
// RemoveSnapshots updates. Snapshots referenced by a branch or tag in
// the given metadata are excluded, unless the same update batch also
// removes that ref. Updates with no remaining snapshot IDs are dropped
// entirely.
func FilterReferencedSnapshots(updates []Update, meta Metadata) []Update {
// Build ref state: ref name → snapshot ID
refsByName := make(map[string]int64)
for name, ref := range meta.Refs() {
refsByName[name] = ref.SnapshotID
}

// Apply ref removals from the same update batch
for _, upd := range updates {
if rs, ok := upd.(*removeSnapshotRefUpdate); ok {
delete(refsByName, rs.RefName)
}
}

// Build the effective referenced set after accounting for ref removals
referenced := make(map[int64]bool)
for _, snapID := range refsByName {
referenced[snapID] = true
}

// Filter RemoveSnapshots updates
result := make([]Update, 0, len(updates))
for _, upd := range updates {
rs, ok := upd.(*removeSnapshotsUpdate)
if !ok {
result = append(result, upd)
continue

Check failure on line 595 in table/updates.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.24.9

continue with no blank line before (nlreturn)

Check failure on line 595 in table/updates.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

continue with no blank line before (nlreturn)

Check failure on line 595 in table/updates.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.24.9

continue with no blank line before (nlreturn)

Check failure on line 595 in table/updates.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.24.9

continue with no blank line before (nlreturn)

Check failure on line 595 in table/updates.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

continue with no blank line before (nlreturn)

Check failure on line 595 in table/updates.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

continue with no blank line before (nlreturn)
}

filtered := make([]int64, 0, len(rs.SnapshotIDs))
for _, id := range rs.SnapshotIDs {
if !referenced[id] {
filtered = append(filtered, id)
}
}
if len(filtered) > 0 {
result = append(result, NewRemoveSnapshotsUpdate(filtered))
}
}

return result
}
109 changes: 109 additions & 0 deletions table/updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,112 @@
})
}
}

// buildMetadataWithSnapshots creates table metadata with the given snapshots
// and refs for use in FilterReferencedSnapshots tests.
func buildMetadataWithSnapshots(t *testing.T, snapshots []Snapshot, refs map[string]SnapshotRef) Metadata {
t.Helper()
builder := builderWithoutChanges(2)
baseTs := builder.base.LastUpdatedMillis()
for i := range snapshots {
snapshots[i].TimestampMs = baseTs + int64(i) + 1
require.NoError(t, builder.AddSnapshot(&snapshots[i]))
}
for name, ref := range refs {
require.NoError(t, builder.SetSnapshotRef(name, ref.SnapshotID, ref.SnapshotRefType))
}
meta, err := builder.Build()
require.NoError(t, err)
return meta

Check failure on line 263 in table/updates_test.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.24.9

return with no blank line before (nlreturn)

Check failure on line 263 in table/updates_test.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 263 in table/updates_test.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.24.9

return with no blank line before (nlreturn)

Check failure on line 263 in table/updates_test.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.24.9

return with no blank line before (nlreturn)

Check failure on line 263 in table/updates_test.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 263 in table/updates_test.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

return with no blank line before (nlreturn)
}

func TestFilterReferencedSnapshots(t *testing.T) {
schemaID := 0
snap := func(id int64) Snapshot {
return Snapshot{
SnapshotID: id,
ManifestList: "/snap.avro",
Summary: &Summary{Operation: OpAppend},
SchemaID: &schemaID,
}
}

t.Run("skips referenced snapshots", func(t *testing.T) {
// snap-1 referenced by feature-branch, snap-2 unreferenced, snap-3 referenced by main
meta := buildMetadataWithSnapshots(t,
[]Snapshot{snap(1), snap(2), snap(3)},
map[string]SnapshotRef{
"feature-branch": {SnapshotID: 1, SnapshotRefType: BranchRef},
MainBranch: {SnapshotID: 3, SnapshotRefType: BranchRef},
},
)

updates := []Update{
NewRemoveSnapshotsUpdate([]int64{1, 2}),
}

filtered := FilterReferencedSnapshots(updates, meta)
require.Len(t, filtered, 1)
rs := filtered[0].(*removeSnapshotsUpdate)
require.Equal(t, []int64{2}, rs.SnapshotIDs)
})

t.Run("allows removal when ref is also removed in same batch", func(t *testing.T) {
// tag-1 → snap-1, main → snap-2
meta := buildMetadataWithSnapshots(t,
[]Snapshot{snap(1), snap(2)},
map[string]SnapshotRef{
"tag-1": {SnapshotID: 1, SnapshotRefType: TagRef},
MainBranch: {SnapshotID: 2, SnapshotRefType: BranchRef},
},
)

updates := []Update{
NewRemoveSnapshotRefUpdate("tag-1"),
NewRemoveSnapshotsUpdate([]int64{1}),
}

filtered := FilterReferencedSnapshots(updates, meta)
// RemoveSnapshotRef passes through, RemoveSnapshots keeps snap-1
require.Len(t, filtered, 2)
require.Equal(t, UpdateRemoveSnapshotRef, filtered[0].Action())
rs := filtered[1].(*removeSnapshotsUpdate)
require.Equal(t, []int64{1}, rs.SnapshotIDs)
})

t.Run("drops update entirely when all snapshots are referenced", func(t *testing.T) {
meta := buildMetadataWithSnapshots(t,
[]Snapshot{snap(1), snap(2)},
map[string]SnapshotRef{
"branch-a": {SnapshotID: 1, SnapshotRefType: BranchRef},
"branch-b": {SnapshotID: 2, SnapshotRefType: BranchRef},
},
)

updates := []Update{
NewRemoveSnapshotsUpdate([]int64{1, 2}),
}

filtered := FilterReferencedSnapshots(updates, meta)
require.Len(t, filtered, 0)
})

t.Run("passes through non-RemoveSnapshots updates unchanged", func(t *testing.T) {
meta := buildMetadataWithSnapshots(t,
[]Snapshot{snap(1)},
map[string]SnapshotRef{
MainBranch: {SnapshotID: 1, SnapshotRefType: BranchRef},
},
)

updates := []Update{
NewSetPropertiesUpdate(iceberg.Properties{"key": "value"}),
NewRemovePropertiesUpdate([]string{"old-key"}),
}

filtered := FilterReferencedSnapshots(updates, meta)
require.Len(t, filtered, 2)
require.Equal(t, UpdateSetProperties, filtered[0].Action())
require.Equal(t, UpdateRemoveProperties, filtered[1].Action())
})
}
Loading