diff --git a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/dml/IncrementalReadTest.java b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/dml/IncrementalReadTest.java new file mode 100644 index 000000000..c78d13e1b --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/dml/IncrementalReadTest.java @@ -0,0 +1,248 @@ +package com.linkedin.openhouse.spark.e2e.dml; + +import static com.linkedin.openhouse.spark.MockHelpers.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.google.common.collect.ImmutableList; +import com.linkedin.openhouse.spark.SparkTestBase; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class IncrementalReadTest { + + @Test + public void testIncrementalReadBetweenSnapshots() { + TableIdentifier tableId = TableIdentifier.of("dbIncr", "tbl"); + + // Create table with first batch of data: ('1', 'a'), ('2', 'b') → snapshot 1 + mockTableLocationDefaultSchema(tableId, true); + // Insert second batch: ('3', 'c'), ('4', 'd') → snapshot 2 + String tableLocation = + mockTableLocationAfterOperation(tableId, "INSERT INTO %t VALUES ('3', 'c'), ('4', 'd')"); + + Object mockResponseBody = + mockGetTableResponseBody( + "dbIncr", + "tbl", + "testCluster", + "dbIncr.tbl", + "ABCD", + tableLocation, + "V1", + baseSchema, + null, + null); + + // Mock for querying .snapshots metadata table + mockTableService.enqueue(mockResponse(200, mockResponseBody)); + + List snapshots = + spark + .sql("SELECT * FROM openhouse.dbIncr.tbl.snapshots ORDER BY committed_at") + .collectAsList(); + Assertions.assertEquals(2, snapshots.size(), "Should have exactly 2 snapshots"); + + long startSnapshotId = snapshots.get(0).getLong(snapshots.get(0).fieldIndex("snapshot_id")); + long endSnapshotId = snapshots.get(1).getLong(snapshots.get(1).fieldIndex("snapshot_id")); + + // Mock for incremental read via DataFrame API (doRefresh calls) + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh + mockTableService.enqueue(mockResponse(200, mockResponseBody)); // doRefresh + + // Incremental read: should return only data added between snapshot 1 (exclusive) and + // snapshot 2 (inclusive), i.e. only batch 2 + List incrementalRows = + spark.read().format("iceberg").option("start-snapshot-id", String.valueOf(startSnapshotId)) + .option("end-snapshot-id", String.valueOf(endSnapshotId)).load("openhouse.dbIncr.tbl") + .collectAsList().stream() + .map(row -> row.mkString(".")) + .collect(Collectors.toList()); + + Assertions.assertEquals(2, incrementalRows.size(), "Incremental read should return 2 rows"); + Assertions.assertTrue( + incrementalRows.containsAll(ImmutableList.of("3.c", "4.d")), + "Incremental read should contain only the second batch of data"); + Assertions.assertFalse( + incrementalRows.contains("1.a"), + "Incremental read should NOT contain data from the first batch"); + } + + @Test + public void testIncrementalReadSingleSnapshotRange() { + TableIdentifier tableId = TableIdentifier.of("dbIncr", "tblSingle"); + + // Create table with first batch: ('1', 'a'), ('2', 'b') → snapshot 1 + mockTableLocationDefaultSchema(tableId, true); + // Insert second batch: ('3', 'c') → snapshot 2 + mockTableLocationAfterOperation(tableId, "INSERT INTO %t VALUES ('3', 'c')"); + // Insert third batch: ('4', 'd') → snapshot 3 + String tableLocation = + mockTableLocationAfterOperation(tableId, "INSERT INTO %t VALUES ('4', 'd')"); + + Object mockResponseBody = + mockGetTableResponseBody( + "dbIncr", + "tblSingle", + "testCluster", + "dbIncr.tblSingle", + "ABCD", + tableLocation, + "V1", + baseSchema, + null, + null); + + // Mock for querying .snapshots metadata table + mockTableService.enqueue(mockResponse(200, mockResponseBody)); + + List snapshots = + spark + .sql("SELECT * FROM openhouse.dbIncr.tblSingle.snapshots ORDER BY committed_at") + .collectAsList(); + Assertions.assertEquals(3, snapshots.size()); + + long snap1 = snapshots.get(0).getLong(snapshots.get(0).fieldIndex("snapshot_id")); + long snap2 = snapshots.get(1).getLong(snapshots.get(1).fieldIndex("snapshot_id")); + + // Mock for incremental read from snap1 (exclusive) to snap2 (inclusive) + // Should return only the single row added in snapshot 2 + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh + mockTableService.enqueue(mockResponse(200, mockResponseBody)); // doRefresh + + List incrementalRows = + spark.read().format("iceberg").option("start-snapshot-id", String.valueOf(snap1)) + .option("end-snapshot-id", String.valueOf(snap2)).load("openhouse.dbIncr.tblSingle") + .collectAsList().stream() + .map(row -> row.mkString(".")) + .collect(Collectors.toList()); + + Assertions.assertEquals( + 1, incrementalRows.size(), "Should return exactly 1 row from snapshot 2"); + Assertions.assertTrue( + incrementalRows.contains("3.c"), "Should contain only the row added in snapshot 2"); + } + + @Test + public void testIncrementalReadWithThreeSnapshots() { + TableIdentifier tableId = TableIdentifier.of("dbIncr", "tbl3"); + + // Create table with batch 1: ('1', 'a'), ('2', 'b') → snapshot 1 + mockTableLocationDefaultSchema(tableId, true); + // Insert batch 2: ('3', 'c'), ('4', 'd') → snapshot 2 + mockTableLocationAfterOperation(tableId, "INSERT INTO %t VALUES ('3', 'c'), ('4', 'd')"); + // Insert batch 3: ('5', 'e'), ('6', 'f') → snapshot 3 + String tableLocation = + mockTableLocationAfterOperation(tableId, "INSERT INTO %t VALUES ('5', 'e'), ('6', 'f')"); + + Object mockResponseBody = + mockGetTableResponseBody( + "dbIncr", + "tbl3", + "testCluster", + "dbIncr.tbl3", + "ABCD", + tableLocation, + "V1", + baseSchema, + null, + null); + + // Mock for querying .snapshots metadata table + mockTableService.enqueue(mockResponse(200, mockResponseBody)); + + List snapshots = + spark + .sql("SELECT * FROM openhouse.dbIncr.tbl3.snapshots ORDER BY committed_at") + .collectAsList(); + Assertions.assertEquals(3, snapshots.size(), "Should have exactly 3 snapshots"); + + long snap1 = snapshots.get(0).getLong(snapshots.get(0).fieldIndex("snapshot_id")); + long snap3 = snapshots.get(2).getLong(snapshots.get(2).fieldIndex("snapshot_id")); + + // Mock for incremental read spanning snapshots 1 to 3 + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh + mockTableService.enqueue(mockResponse(200, mockResponseBody)); // doRefresh + + // Incremental read from snapshot 1 (exclusive) to snapshot 3 (inclusive) + // Should return batches 2 and 3 + List incrementalRows = + spark.read().format("iceberg").option("start-snapshot-id", String.valueOf(snap1)) + .option("end-snapshot-id", String.valueOf(snap3)).load("openhouse.dbIncr.tbl3") + .collectAsList().stream() + .map(row -> row.mkString(".")) + .collect(Collectors.toList()); + + Assertions.assertEquals(4, incrementalRows.size(), "Should return 4 rows from batches 2 and 3"); + Assertions.assertTrue( + incrementalRows.containsAll(ImmutableList.of("3.c", "4.d", "5.e", "6.f")), + "Should contain data from batches 2 and 3"); + Assertions.assertFalse(incrementalRows.contains("1.a"), "Should NOT contain data from batch 1"); + } + + @Test + public void testIncrementalReadWithOverwriteInRange() { + TableIdentifier tableId = TableIdentifier.of("dbIncr", "tblOvw"); + + // Create table with batch 1: ('1', 'a'), ('2', 'b') → append snapshot 1 + mockTableLocationDefaultSchema(tableId, true); + // Overwrite all data: ('3', 'c') → overwrite snapshot 2 + mockTableLocationAfterOperation(tableId, "INSERT OVERWRITE %t VALUES ('3', 'c')"); + // Insert batch 3: ('4', 'd') → append snapshot 3 + String tableLocation = + mockTableLocationAfterOperation(tableId, "INSERT INTO %t VALUES ('4', 'd')"); + + Object mockResponseBody = + mockGetTableResponseBody( + "dbIncr", + "tblOvw", + "testCluster", + "dbIncr.tblOvw", + "ABCD", + tableLocation, + "V1", + baseSchema, + null, + null); + + // Mock for querying .snapshots metadata table + mockTableService.enqueue(mockResponse(200, mockResponseBody)); + + List snapshots = + spark + .sql("SELECT * FROM openhouse.dbIncr.tblOvw.snapshots ORDER BY committed_at") + .collectAsList(); + Assertions.assertEquals(3, snapshots.size(), "Should have exactly 3 snapshots"); + + long snap1 = snapshots.get(0).getLong(snapshots.get(0).fieldIndex("snapshot_id")); + long snap3 = snapshots.get(2).getLong(snapshots.get(2).fieldIndex("snapshot_id")); + + // Verify the middle snapshot is an overwrite operation + String snap2Operation = snapshots.get(1).getString(snapshots.get(1).fieldIndex("operation")); + Assertions.assertEquals("overwrite", snap2Operation, "Middle snapshot should be an overwrite"); + + // Mock for incremental read attempt + mockTableService.enqueue(mockResponse(404, mockGetAllTableResponseBody())); // doRefresh + mockTableService.enqueue(mockResponse(200, mockResponseBody)); // doRefresh + + // Iceberg 1.2 (Spark 3.1): IncrementalAppendScan rejects non-append snapshots in range + UnsupportedOperationException e = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + spark + .read() + .format("iceberg") + .option("start-snapshot-id", String.valueOf(snap1)) + .option("end-snapshot-id", String.valueOf(snap3)) + .load("openhouse.dbIncr.tblOvw") + .collectAsList()); + Assertions.assertTrue( + e.getMessage().contains("overwrite"), "Error should mention the non-append operation type"); + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-itest/build.gradle b/integrations/spark/spark-3.5/openhouse-spark-itest/build.gradle index c32e0dd7e..32d68a2f6 100644 --- a/integrations/spark/spark-3.5/openhouse-spark-itest/build.gradle +++ b/integrations/spark/spark-3.5/openhouse-spark-itest/build.gradle @@ -79,6 +79,9 @@ test { filter { excludeTestsMatching 'com.linkedin.openhouse.spark.statementtest.*' excludeTestsMatching 'com.linkedin.openhouse.spark.catalogtest.*' + // Iceberg 1.2 (Spark 3.1) throws on non-append snapshots; Iceberg 1.5 (Spark 3.5) silently skips them. + // Spark 3.5 behavior is covered by catalogtest/e2e/IncrementalReadTest.testIncrementalReadWithOverwriteSkipsNonAppends. + excludeTestsMatching 'com.linkedin.openhouse.spark.e2e.dml.IncrementalReadTest.testIncrementalReadWithOverwriteInRange' } if (JavaVersion.current() >= JavaVersion.VERSION_1_9) { jvmArgs \ diff --git a/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/ChangelogViewTest.java b/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/ChangelogViewTest.java new file mode 100644 index 000000000..e4f4145f6 --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/ChangelogViewTest.java @@ -0,0 +1,269 @@ +package com.linkedin.openhouse.spark.catalogtest.e2e; + +import static org.junit.jupiter.api.Assertions.*; + +import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; + +public class ChangelogViewTest extends OpenHouseSparkITest { + + private static final String DATABASE = "d1_changelog"; + + @Test + void testChangelogViewForAppends() throws Exception { + try (SparkSession spark = getSparkSession()) { + String name = DATABASE + ".changelog_appends"; + spark.sql("DROP TABLE IF EXISTS openhouse." + name); + spark.sql("CREATE TABLE openhouse." + name + " (id int, data string)"); + + spark.sql("INSERT INTO openhouse." + name + " VALUES (1, 'a'), (2, 'b')"); + spark.sql("INSERT INTO openhouse." + name + " VALUES (3, 'c')"); + + // Get snapshot IDs + List snapshots = + spark + .sql("SELECT snapshot_id FROM openhouse." + name + ".snapshots ORDER BY committed_at") + .collectAsList(); + assertEquals(2, snapshots.size()); + long snap1 = snapshots.get(0).getLong(0); + long snap2 = snapshots.get(1).getLong(0); + + // Create changelog view between the two snapshots + spark.sql( + String.format( + "CALL openhouse.system.create_changelog_view(" + + "table => '%s', " + + "options => map('start-snapshot-id', '%d', 'end-snapshot-id', '%d'))", + name, snap1, snap2)); + + // The default view name is _changes + List changes = spark.sql("SELECT * FROM changelog_appends_changes").collectAsList(); + assertEquals(1, changes.size(), "Should have 1 change (the appended row)"); + + // Verify change type and data + Row change = changes.get(0); + assertEquals("INSERT", change.getAs("_change_type")); + assertEquals(3, (int) change.getAs("id")); + assertEquals("c", change.getAs("data")); + + spark.sql("DROP TABLE openhouse." + name); + } + } + + @Test + void testChangelogViewForOverwrite() throws Exception { + try (SparkSession spark = getSparkSession()) { + String name = DATABASE + ".changelog_overwrite"; + spark.sql("DROP TABLE IF EXISTS openhouse." + name); + spark.sql("CREATE TABLE openhouse." + name + " (id int, data string)"); + + spark.sql("INSERT INTO openhouse." + name + " VALUES (1, 'a'), (2, 'b')"); + spark.sql("INSERT OVERWRITE openhouse." + name + " VALUES (3, 'c')"); + + List snapshots = + spark + .sql("SELECT snapshot_id FROM openhouse." + name + ".snapshots ORDER BY committed_at") + .collectAsList(); + assertEquals(2, snapshots.size()); + long snap1 = snapshots.get(0).getLong(0); + long snap2 = snapshots.get(1).getLong(0); + + spark.sql( + String.format( + "CALL openhouse.system.create_changelog_view(" + + "table => '%s', " + + "options => map('start-snapshot-id', '%d', 'end-snapshot-id', '%d'))", + name, snap1, snap2)); + + List changes = spark.sql("SELECT * FROM changelog_overwrite_changes").collectAsList(); + + // Overwrite should produce DELETEs for old rows and INSERTs for new rows + Set changeTypes = + changes.stream().map(r -> r.getAs("_change_type").toString()).collect(Collectors.toSet()); + assertTrue(changeTypes.contains("DELETE"), "Should have DELETE changes for overwritten rows"); + assertTrue(changeTypes.contains("INSERT"), "Should have INSERT changes for new rows"); + + // Verify the deletes are for the original rows + List deletes = + changes.stream() + .filter(r -> "DELETE".equals(r.getAs("_change_type"))) + .collect(Collectors.toList()); + assertEquals(2, deletes.size(), "Should delete both original rows"); + Set deletedIds = + deletes.stream().map(r -> (int) r.getAs("id")).collect(Collectors.toSet()); + assertTrue(deletedIds.containsAll(Set.of(1, 2))); + + // Verify the insert is the new row + List inserts = + changes.stream() + .filter(r -> "INSERT".equals(r.getAs("_change_type"))) + .collect(Collectors.toList()); + assertEquals(1, inserts.size(), "Should insert one new row"); + assertEquals(3, (int) inserts.get(0).getAs("id")); + assertEquals("c", inserts.get(0).getAs("data")); + + spark.sql("DROP TABLE openhouse." + name); + } + } + + @Test + void testChangelogViewForDelete() throws Exception { + try (SparkSession spark = getSparkSession()) { + String name = DATABASE + ".changelog_delete"; + spark.sql("DROP TABLE IF EXISTS openhouse." + name); + spark.sql( + "CREATE TABLE openhouse." + + name + + " (id int, data string) TBLPROPERTIES ('format-version'='2')"); + + spark.sql("INSERT INTO openhouse." + name + " VALUES (1, 'a'), (2, 'b'), (3, 'c')"); + spark.sql("DELETE FROM openhouse." + name + " WHERE id = 2"); + + List snapshots = + spark + .sql("SELECT snapshot_id FROM openhouse." + name + ".snapshots ORDER BY committed_at") + .collectAsList(); + assertEquals(2, snapshots.size()); + long snap1 = snapshots.get(0).getLong(0); + long snap2 = snapshots.get(1).getLong(0); + + spark.sql( + String.format( + "CALL openhouse.system.create_changelog_view(" + + "table => '%s', " + + "options => map('start-snapshot-id', '%d', 'end-snapshot-id', '%d'))", + name, snap1, snap2)); + + List changes = spark.sql("SELECT * FROM changelog_delete_changes").collectAsList(); + + // DELETE should produce a DELETE change for the removed row + assertEquals(1, changes.size(), "Should have 1 change for the deleted row"); + Row change = changes.get(0); + assertEquals("DELETE", change.getAs("_change_type")); + assertEquals(2, (int) change.getAs("id")); + assertEquals("b", change.getAs("data")); + + spark.sql("DROP TABLE openhouse." + name); + } + } + + @Test + void testChangelogViewWithNetChanges() throws Exception { + try (SparkSession spark = getSparkSession()) { + String name = DATABASE + ".changelog_net"; + spark.sql("DROP TABLE IF EXISTS openhouse." + name); + spark.sql( + "CREATE TABLE openhouse." + + name + + " (id int, data string) TBLPROPERTIES ('format-version'='2')"); + + // Insert, then delete, then re-insert the same id across multiple snapshots + spark.sql("INSERT INTO openhouse." + name + " VALUES (1, 'a'), (2, 'b')"); + spark.sql("DELETE FROM openhouse." + name + " WHERE id = 1"); + spark.sql("INSERT INTO openhouse." + name + " VALUES (1, 'a_updated'), (3, 'c')"); + + List snapshots = + spark + .sql("SELECT snapshot_id FROM openhouse." + name + ".snapshots ORDER BY committed_at") + .collectAsList(); + assertEquals(3, snapshots.size()); + long snap1 = snapshots.get(0).getLong(0); + long snap3 = snapshots.get(2).getLong(0); + + // Use net_changes to collapse intermediate changes. + // compute_updates defaults to true when identifier_columns is set, but + // net_changes and compute_updates cannot both be true, so disable compute_updates. + spark.sql( + String.format( + "CALL openhouse.system.create_changelog_view(" + + "table => '%s', " + + "options => map('start-snapshot-id', '%d', 'end-snapshot-id', '%d'), " + + "net_changes => true, " + + "compute_updates => false, " + + "identifier_columns => array('id'))", + name, snap1, snap3)); + + List changes = + spark + .sql("SELECT * FROM changelog_net_changes ORDER BY _change_type, id") + .collectAsList(); + + // With compute_updates=false, net changes for each id: + // id=1: deleted then re-inserted → separate DELETE + INSERT (not collapsed into UPDATE) + // id=2: unchanged → no change + // id=3: inserted → INSERT + assertEquals(3, changes.size(), "Should have 3 net changes"); + + // id=1 DELETE (old value) + Row delete1 = changes.get(0); + assertEquals("DELETE", delete1.getAs("_change_type")); + assertEquals(1, (int) delete1.getAs("id")); + assertEquals("a", delete1.getAs("data")); + + // id=1 INSERT (new value) + Row insert1 = changes.get(1); + assertEquals("INSERT", insert1.getAs("_change_type")); + assertEquals(1, (int) insert1.getAs("id")); + assertEquals("a_updated", insert1.getAs("data")); + + // id=3 INSERT + Row insert3 = changes.get(2); + assertEquals("INSERT", insert3.getAs("_change_type")); + assertEquals(3, (int) insert3.getAs("id")); + assertEquals("c", insert3.getAs("data")); + + // id=2 should not appear (unchanged across the range) + assertTrue( + changes.stream().noneMatch(r -> (int) r.getAs("id") == 2), + "id=2 should not appear in net changes (unchanged)"); + + spark.sql("DROP TABLE openhouse." + name); + } + } + + @Test + void testChangelogViewMultipleSnapshotsSpan() throws Exception { + try (SparkSession spark = getSparkSession()) { + String name = DATABASE + ".changelog_multi"; + spark.sql("DROP TABLE IF EXISTS openhouse." + name); + spark.sql("CREATE TABLE openhouse." + name + " (id int, data string)"); + + spark.sql("INSERT INTO openhouse." + name + " VALUES (1, 'a')"); + spark.sql("INSERT INTO openhouse." + name + " VALUES (2, 'b')"); + spark.sql("INSERT INTO openhouse." + name + " VALUES (3, 'c')"); + + List snapshots = + spark + .sql("SELECT snapshot_id FROM openhouse." + name + ".snapshots ORDER BY committed_at") + .collectAsList(); + assertEquals(3, snapshots.size()); + long snap1 = snapshots.get(0).getLong(0); + long snap3 = snapshots.get(2).getLong(0); + + spark.sql( + String.format( + "CALL openhouse.system.create_changelog_view(" + + "table => '%s', " + + "options => map('start-snapshot-id', '%d', 'end-snapshot-id', '%d'))", + name, snap1, snap3)); + + List changes = + spark.sql("SELECT * FROM changelog_multi_changes ORDER BY id").collectAsList(); + + // Should see INSERTs for rows added in snapshots 2 and 3 + assertEquals(2, changes.size(), "Should see 2 inserts spanning snapshots 2 and 3"); + assertTrue(changes.stream().allMatch(r -> "INSERT".equals(r.getAs("_change_type")))); + + Set insertedIds = + changes.stream().map(r -> (int) r.getAs("id")).collect(Collectors.toSet()); + assertEquals(Set.of(2, 3), insertedIds); + + spark.sql("DROP TABLE openhouse." + name); + } + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/IncrementalReadTest.java b/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/IncrementalReadTest.java new file mode 100644 index 000000000..447401c55 --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/e2e/IncrementalReadTest.java @@ -0,0 +1,61 @@ +package com.linkedin.openhouse.spark.catalogtest.e2e; + +import static org.junit.jupiter.api.Assertions.*; + +import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; + +public class IncrementalReadTest extends OpenHouseSparkITest { + + private static final String DATABASE = "d1_incremental"; + + @Test + void testIncrementalReadWithOverwriteSkipsNonAppends() throws Exception { + try (SparkSession spark = getSparkSession()) { + String name = DATABASE + ".incr_overwrite"; + spark.sql("DROP TABLE IF EXISTS openhouse." + name); + spark.sql("CREATE TABLE openhouse." + name + " (id string, data string)"); + + // Batch 1: append → snapshot 1 + spark.sql("INSERT INTO openhouse." + name + " VALUES ('1', 'a'), ('2', 'b')"); + // Overwrite all data → overwrite snapshot 2 + spark.sql("INSERT OVERWRITE openhouse." + name + " VALUES ('3', 'c')"); + // Batch 3: append → snapshot 3 + spark.sql("INSERT INTO openhouse." + name + " VALUES ('4', 'd')"); + + List snapshots = + spark + .sql( + "SELECT snapshot_id, operation FROM openhouse." + + name + + ".snapshots ORDER BY committed_at") + .collectAsList(); + assertEquals(3, snapshots.size(), "Should have exactly 3 snapshots"); + assertEquals( + "overwrite", snapshots.get(1).getString(1), "Middle snapshot should be an overwrite"); + + long snap1 = snapshots.get(0).getLong(0); + long snap3 = snapshots.get(2).getLong(0); + + // Iceberg 1.5 (Spark 3.5): IncrementalAppendScan silently skips non-append snapshots + // and returns only data from append operations. + List incrementalRows = + spark.read().format("iceberg").option("start-snapshot-id", String.valueOf(snap1)) + .option("end-snapshot-id", String.valueOf(snap3)).load("openhouse." + name) + .collectAsList().stream() + .map(row -> row.mkString(".")) + .collect(Collectors.toList()); + + assertEquals( + 1, incrementalRows.size(), "Should return only appended rows, skipping the overwrite"); + assertTrue( + incrementalRows.contains("4.d"), "Should contain only the row from the append snapshot"); + + spark.sql("DROP TABLE openhouse." + name); + } + } +}