API, CORE, Flink, Spark: Deprecate Snapshot Change Methods#15241
API, CORE, Flink, Spark: Deprecate Snapshot Change Methods#15241RussellSpitzer wants to merge 5 commits intoapache:mainfrom
Conversation
We currently offer several methods for getting files changed in a snapshot but they rely on the assumption that you can read the partition_spec from the manifest metadata. In advance of the move to Parquet Manifest, we'll be no longer able to rely on this part of the manifest read code. In this PR we deprecate those existing methods and create a new utility class which can do the same thing as the old Snapshot methods. The new utility class does not assume that the manifest read code can actually read the partition_spec info and instead takes it as an arguement. In production code there are only a small number of actual uses 1. CherryPickOperation 2. MicroBatches Within our other modules we also had a few usages Flink 1. TableChange Spark 2. MicroBatchStream Unfortuantely there are also a huge number of test usages of these methods, the majority of this commit is cleaning those up.
| @@ -112,7 +112,11 @@ public interface Snapshot extends Serializable { | |||
| * | |||
| * @param io a {@link FileIO} instance used for reading files from storage | |||
| * @return all data files added to the table in this snapshot. | |||
There was a problem hiding this comment.
The Main Deprecations are Here
| // Pick modifications from the snapshot | ||
| for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) { | ||
| SnapshotFileChanges changes = | ||
| SnapshotFileChanges.builder(cherrypickSnapshot, io, specsById).build(); |
There was a problem hiding this comment.
We technically don't have to use the cached utility here, but we use it in the other usage within this file so I thought it was a bit clearer this way.
There was a problem hiding this comment.
This looks reasonable to me. Cached or not is the internal impl of SnapshotFileChanges
| failMissingDeletePaths(); | ||
|
|
||
| // copy adds from the picked snapshot | ||
| // copy adds and deletes from the picked snapshot |
There was a problem hiding this comment.
| // copy adds and deletes from the picked snapshot | |
| // copy adds from the picked snapshot |
| @@ -0,0 +1,276 @@ | |||
| /* | |||
There was a problem hiding this comment.
This is the actual Utility we are switching to
There was a problem hiding this comment.
With V4 metadata change, the change detection can be more complicated with manfest DVs. I like this direction of moving the change detection out of the Snapshot class, which can just focus on core data structures.
This could be a good foundation for the change detection in the V4 adaptive tree.
In V4, if we are going to colocate DV (deleted old and added new) and data file, it might make sense to expose a combined result. Otherwise, the associations get split first and then need to be joined again.
| * @param specsById a map of partition spec IDs to partition specs | ||
| * @return a new Builder | ||
| */ | ||
| public static Builder builder( |
There was a problem hiding this comment.
I think we may want to in the future extend this to take multiple snapshots so ti may make sense to break the api into a "specs,io" and "snapshot" seperately, but not now
| } | ||
|
|
||
| private void cacheDataFileChanges() { | ||
| List<ManifestFile> changedManifests = |
There was a problem hiding this comment.
I changed the logic from the BaseSnapshot implementation to optionally use an ExecutorService. I want to save actually using that for a followup PR but I think it was probably a mistake that we had this single threaded before.
| } | ||
|
|
||
| private void cacheDeleteFileChanges() { | ||
| List<ManifestFile> changedManifests = |
There was a problem hiding this comment.
Similar to above this differs from the BaseSnapshot impl by using an optional executor service
| } | ||
|
|
||
| public static class Builder { | ||
| private final Snapshot snapshot; |
There was a problem hiding this comment.
I am attempting to hid some of the constructor details from the Changes Class with this builder, but that may be premature.
|
|
||
| return metadata.snapshot(ref.snapshotId()); | ||
| } | ||
|
|
There was a problem hiding this comment.
A set of static methods which allow for getting just one element of the Changes class without actually constructing it. We can drop these as well but I think they make the test code refactor a bit smaller.
| @@ -31,6 +31,7 @@ | |||
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | |||
There was a problem hiding this comment.
Another production code change here
| assertThat(table.schema().asStruct()).isEqualTo(expected.asStruct()); | ||
| assertThat(SnapshotUtil.schemaFor(table, tag).asStruct()).isEqualTo(initialSchema.asStruct()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Actual Tests for the Utility
|
|
||
| TableChange(Snapshot snapshot, FileIO io) { | ||
| this(snapshot.addedDataFiles(io), snapshot.addedDeleteFiles(io)); | ||
| TableChange(Snapshot snapshot, Table table) { |
There was a problem hiding this comment.
This is a bit of a messier change, The old version called a (this) but we can't do that with the new Caching object since we can't init and access it twice.
So the logic for doing the actual counts had to be moved into this method so I could take advantage of the cache and not change performance.
There was a problem hiding this comment.
this code is for the old Flink source, which is deprecated since 1.7.0. It is fine if we copy the code a little bit
There was a problem hiding this comment.
Claude Opus 4.5 suggestion (after some massaging)
TableChange(Snapshot snapshot, Table table) {
this(SnapshotFileChanges.builder(snapshot, table.io(), table.specs()).build());
}
private TableChange(SnapshotFileChanges changes) {
this(changes.addedDataFiles(), changes.addedDeleteFiles());
}
There was a problem hiding this comment.
@stevenzwu: This is live code for Flink table maintenance
There was a problem hiding this comment.
Ah good call, That's a lot cleaner I like it
There was a problem hiding this comment.
@pvary you are right. When I saw MonitorSource, I thought it was for the old deprecated FlinkSource
| // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, | ||
| // iterate through addedFiles iterator to find addedFilesCount. | ||
| return addedFilesCount == -1 | ||
| ? Iterables.size(snapshot.addedDataFiles(table.io())) |
There was a problem hiding this comment.
This is the only Spark Usage, pretty straight forward fix
|
If it's easier for reviewers, I can also split this into "deprecate and utility" and then do all the production and test updates in a follow up PR. I just bundled this all up so that we would be able to avoid having a build that has active deprecation warnings. |
| @@ -0,0 +1,276 @@ | |||
| /* | |||
There was a problem hiding this comment.
With V4 metadata change, the change detection can be more complicated with manfest DVs. I like this direction of moving the change detection out of the Snapshot class, which can just focus on core data structures.
This could be a good foundation for the change detection in the V4 adaptive tree.
In V4, if we are going to colocate DV (deleted old and added new) and data file, it might make sense to expose a combined result. Otherwise, the associations get split first and then need to be joined again.
| * @param snapshot the snapshot to detect changes for | ||
| * @return all data files added to the table in this snapshot | ||
| */ | ||
| public static Iterable<DataFile> addedDataFiles(Table table, Snapshot snapshot) { |
There was a problem hiding this comment.
I am wondering if it is necessary to have this overload of Table arg. maybe just have the version that takes (Snapshot, FileIO, Map<Integer, PartitionSpec>). I wouldn't mind if the unit test change add an extra arg.
There was a problem hiding this comment.
+1
It is very easy to call the others if table is available.
There was a problem hiding this comment.
Fair enough, I can remove it
| // Pick modifications from the snapshot | ||
| for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) { | ||
| SnapshotFileChanges changes = | ||
| SnapshotFileChanges.builder(cherrypickSnapshot, io, specsById).build(); |
There was a problem hiding this comment.
This looks reasonable to me. Cached or not is the internal impl of SnapshotFileChanges
| * query multiple file change types for the same snapshot. By default, manifests are read | ||
| * sequentially. Use {@link Builder#executeWith(ExecutorService)} to enable parallel reading. | ||
| */ | ||
| public class SnapshotFileChanges { |
There was a problem hiding this comment.
can we keep this class as package private to start with? SnapshotUtil is in a diff package which prevents this.
Not sure if ChangelogUtil is a better place. Or maybe add a new ChangeDetectionUtil?
| Snapshot snapshotWithAddedFile = table.currentSnapshot(); | ||
|
|
||
| // Test the utility method with table | ||
| Iterable<DataFile> filesFromTableUtil = |
There was a problem hiding this comment.
nit: the variable name is a bit misleading. it is more like filesFromTableParam, because there is actually a class called TableUtil. I also made another comment that we may not need to have the overload that takes a Table arg.
|
|
||
| TableChange(Snapshot snapshot, FileIO io) { | ||
| this(snapshot.addedDataFiles(io), snapshot.addedDeleteFiles(io)); | ||
| TableChange(Snapshot snapshot, Table table) { |
There was a problem hiding this comment.
this code is for the old Flink source, which is deprecated since 1.7.0. It is fine if we copy the code a little bit
| Iterables.filter( | ||
| Iterables.filter( | ||
| snapshot.allManifests(io), | ||
| manifest -> manifest.content() == ManifestContent.DATA), | ||
| manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId()))); |
There was a problem hiding this comment.
nit: why two filters? Could we combine them?
There was a problem hiding this comment.
Just being lazy and mimicking the BaseSnapshot code, Let me tighten that up
| } | ||
| return addedDataFiles; |
There was a problem hiding this comment.
We need a prompt for the AIs to add newlines 🤖
There was a problem hiding this comment.
We can do a "agents.md" i know a bunch of other projects have done this
There was a problem hiding this comment.
Although if we really want this to not be an issue we need to encode it in our style rules
| } | ||
|
|
||
| /** Returns all data files removed from the table in this snapshot. */ | ||
| public Iterable<DataFile> removedDataFiles() { |
There was a problem hiding this comment.
Do we want Iterable or Iterator?
Iterator could be a bit more flexible, if we end up in a situation where the data don't fit into memory, but we might end up duplicating data.
Minimally we should return immutable results
There was a problem hiding this comment.
Currently i'm just mimicing the old signature, but your right it make be nice to change to interator although our current implementation is just iterable. I'm wondering for "iterator" invocations if we should change the implementation of the helper methods I added to SnapshotUtil since this class is basically for caching.
| assertThat(secondCallResult).hasSize(1); | ||
|
|
||
| // Both calls should return the same reference (cached) | ||
| assertThat(firstCallResult).isSameAs(secondCallResult); |
There was a problem hiding this comment.
wondering if it is clearer to move all the test here (including the SnapshotUtil static methods testing) to a new TestSnapshotFileChanges class
Context
This is part of the write metadata with columnar formats change. When we start writing parquet manifests, calls to ManifestReader() without passing through the partitionSpecByID will error out. The reason for the error is that we have no way of reading the metadata in the new file APIs and we have decided we aren't going add it in the future.
Snapshot.addedFiles and it's friends are some of the main users of this ManifestRead(path, IO) path (that aren't test code) so we need to remove those methods and switch our usage to a version which passes through partitionSpecByID. Otherwise switching to parquet manifests will cause issues throughout the codebase.
See #13769
This PR
We currently offer several methods for getting files changed in a snapshot but they rely on the assumption that you can read the partition_spec from the manifest metadata. In advance of the move to Parquet Manifest, we'll be no longer able to rely on this part of the manifest read code.
In this PR we deprecate those existing methods and create a new utility class which can do the same thing as the old Snapshot methods. The new utility class does not assume that the manifest read code can actually read the partition_spec info and instead takes it as an arguement.
Production Code Changes
Core
Flink
Spark
Test Changes
Unfortunately there are also a huge number of test usages of these methods, the majority of this commit is cleaning those up.
As a disclaimer, I did use Cursor and Claude code when writing this PR, It did the majority of the test refactoring although I have checked them all as well.