From 7b2aa415d3a8db251165995e2d10a2551af1b815 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 24 Mar 2026 23:00:27 -0700 Subject: [PATCH 1/4] Adding support to block archival on last known ECTR for v6 tables --- .../versioning/v1/TimelineArchiverV1.java | 29 + .../hudi/config/HoodieArchivalConfig.java | 12 + .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../hudi/io/TestHoodieTimelineArchiver.java | 534 ++++++++++++++++++ 4 files changed, 579 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java index e693a1c2e7fcf..19813793f1be3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java @@ -53,6 +53,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -271,7 +272,31 @@ private Stream getCommitInstantsToArchive() throws IOException { Option oldestInstantToRetainForClustering = ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient(), config.getCleanerPolicy()); + // If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive + // commits that have data files that haven't been cleaned yet. + Option oldestInstantToRetainForClean = Option.empty(); + if (config.shouldBlockArchivalOnCleanECTR()) { + Option lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant(); + if (lastCleanInstant.isPresent()) { + try { + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get()); + if (cleanMetadata.getEarliestCommitToRetain() != null + && !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) { + oldestInstantToRetainForClean = commitTimeline.findInstantsAfterOrEquals( + cleanMetadata.getEarliestCommitToRetain()).firstInstant(); + log.info("Blocking archival based on ECTR {} from last clean {}", + cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime()); + } + } catch (IOException e) { + log.warn("Failed to read clean metadata for {}, skipping ECTR check", lastCleanInstant.get(), e); + throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get() + ", skipping ECTR check", e); + } + } + } + // Actually do the commits + Option finalOldestInstantToRetainForClean = oldestInstantToRetainForClean; Stream instantToArchiveStream = commitTimeline.getInstantsAsStream() .filter(s -> { if (config.shouldArchiveBeyondSavepoint()) { @@ -297,6 +322,10 @@ private Stream getCommitInstantsToArchive() throws IOException { oldestInstantToRetainForClustering.map(instantToRetain -> compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime())) .orElse(true) + ).filter(s -> + finalOldestInstantToRetainForClean.map(instantToRetain -> + compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime())) + .orElse(true) ); return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep); } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java index 1b1e3ba5c3917..a6a1f841ac857 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java @@ -114,6 +114,13 @@ public class HoodieArchivalConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Number of timeline manifest versions to retain."); + public static final ConfigProperty BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR = ConfigProperty + .key("hoodie.archive.block.on.latest.clean.ectr") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("If enabled, archival will block on latest ECTR from last known clean"); + /** * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */ @@ -205,6 +212,11 @@ public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) { return this; } + public Builder withBlockArchivalOnCleanECTR(boolean blockArchivalOnCleanECTR) { + archivalConfig.setValue(BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR, String.valueOf(blockArchivalOnCleanECTR)); + return this; + } + public HoodieArchivalConfig build() { archivalConfig.setDefaults(HoodieArchivalConfig.class.getName()); return archivalConfig; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9d32ec8b7d22d..5fc5ba5406ca6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1975,6 +1975,10 @@ public int getCommitArchivalBatchSize() { return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE); } + public boolean shouldBlockArchivalOnCleanECTR() { + return getBoolean(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR); + } + public Boolean shouldCleanBootstrapBaseFile() { return getBoolean(HoodieCleanConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index c631ec00626c1..cf77d25c09616 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.timeline.TimelineArchivers; import org.apache.hudi.client.WriteClientTestUtils; +import org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1; import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter; import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; @@ -47,6 +48,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.LSMTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler; import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.FileCreateUtilsLegacy; @@ -103,6 +105,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -2189,4 +2192,535 @@ public void testArchivalMetricsWithMixedActionTypes() throws Exception { // Verify archival status is success assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_STATUS), "Archival should succeed"); } + + private void initTableToTestECTRBlock() throws IOException { + HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; + initPath(); + initSparkContexts(); + initTimelineService(); + Properties properties = new Properties(); + properties.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "6"); + properties.setProperty(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key(), "false"); + initMetaClient(properties); + storage = metaClient.getStorage(); + metaClient.getStorage().createDirectory(new StoragePath(basePath)); + metaClient = HoodieTestUtils.init(storageConf, basePath, tableType, properties); + } + + /** + * Tests archival blocking based on ECTR from last clean when config is enabled. + * Verifies that commits with timestamp >= ECTR are not archived. + */ + @Test + public void testArchivalBlocksOnCleanECTRWhenEnabled() throws Exception { + initTableToTestECTRBlock(); + + // Create config with ECTR blocking enabled + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 5 commits: 00000001, 00000002, 00000003, 00000004, 00000005 + for (int i = 1; i <= 5; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with ECTR pointing to commit 00000003 + // This means commits 00000003 and later should not be archived + Map cleanStats = new HashMap<>(); + cleanStats.put("p1", 1); + cleanStats.put("p2", 1); + + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000005" + )); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p2", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000005" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000006", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000003", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000006", cleanerPlan, cleanMetadata); + + // Reload metaClient to pick up the clean commit + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Trigger archival + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); + + // Reload timeline and get active commits + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); + + // Verify that commits 00000003, 00000004, 00000005 are still in active timeline (not archived) + List activeCommitTimes = activeCommits.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + + assertTrue(activeCommitTimes.contains("00000003"), "Commit 00000003 (ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000004"), "Commit 00000004 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000005"), "Commit 00000005 (after ECTR) should not be archived"); + + // Verify commits before ECTR may have been archived (00000001, 00000002) + // They could be archived or retained based on min/max archival settings + // But commits >= ECTR must be present + + + } + + /** + * Tests that archival proceeds normally when config is disabled (default behavior). + * Verifies backward compatibility - existing behavior unchanged when ECTR blocking is off. + */ + @Test + public void testArchivalProceedsNormallyWhenECTRBlockingDisabled() throws Exception { + initTableToTestECTRBlock(); + + // Create config WITHOUT ECTR blocking (default is false) + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + // Do not set withBlockArchivalOnCleanECTR - defaults to false + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 6 commits: 00000001 through 00000006 + for (int i = 1; i <= 6; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with ECTR pointing to commit 00000003 + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000006" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000007", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000007", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000007", cleanerPlan, cleanMetadata); + + // Reload metaClient + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsBeforeArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + // Trigger archival + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); + + // Reload timeline + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsAfterArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + // With config disabled, archival should proceed based on min/max archival commits (2, 3) + // Since we have 6 commits, archival should have kicked in + // This test verifies backward compatibility - ECTR from clean is ignored when config is off + assertTrue(commitsAfterArchival < commitsBeforeArchival || commitsAfterArchival <= 3, + "Archival should proceed normally when ECTR blocking is disabled"); + } + + /** + * Tests graceful handling when clean metadata is missing or cannot be read. + * Verifies archival continues normally without blocking when metadata read fails. + */ + @Test + public void testArchivalContinuesWhenCleanMetadataIsMissing() throws Exception { + initTableToTestECTRBlock(); + + // Create config with ECTR blocking enabled + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 6 commits + for (int i = 1; i <= 6; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // DO NOT create a clean commit - this simulates missing clean metadata + + // Reload metaClient + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Trigger archival - should not fail even though no clean metadata exists + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + + // Should not throw exception + assertDoesNotThrow(() -> archiver.archiveIfRequired(context), + "Archival should continue gracefully when clean metadata is missing"); + + // Verify some archival happened (based on min/max archival commits) + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsAfterArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + // With 6 commits and max_archival=3, some archival should occur + assertTrue(commitsAfterArchival <= 3, "Archival should proceed when clean metadata is missing"); + } + + /** + * Tests that clean commits with empty or null ECTR are handled gracefully. + * Verifies archival proceeds normally when ECTR is not set in clean metadata. + */ + @Test + public void testArchivalHandlesEmptyECTRInCleanMetadata() throws Exception { + initTableToTestECTRBlock(); + + // Create config with ECTR blocking enabled + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 6 commits + for (int i = 1; i <= 6; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with EMPTY ECTR + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "", // Empty ECTR + "00000006" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000007", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000007", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000007", cleanerPlan, cleanMetadata); + + // Reload metaClient + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Trigger archival - should not fail with empty ECTR + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + + assertDoesNotThrow(() -> archiver.archiveIfRequired(context), + "Archival should handle empty ECTR gracefully"); + + // Verify archival proceeded normally + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsAfterArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + assertTrue(commitsAfterArchival <= 3, "Archival should proceed normally with empty ECTR"); + } + + /** + * Tests that archival can still make progress when ECTR is later than the archival window. + * This verifies the feature doesn't unnecessarily block archival when it's safe to proceed. + * + * Scenario: Create 10 commits, with ECTR pointing to commit 8. Archival should be able to + * archive commits 1-7 since they are before ECTR and outside the retention window. + */ + @Test + public void testArchivalMakesProgressWhenECTRIsLaterThanArchivalWindow() throws Exception { + initTableToTestECTRBlock(); + + // Create config with ECTR blocking enabled + // Min commits to keep = 2, max commits to keep = 3 + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 10 commits: 00000001 through 00000010 + for (int i = 1; i <= 10; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with ECTR pointing to commit 00000008 + // This ECTR is later than what archival would retain based on min/max archival commits (2, 3) + // So archival should be able to archive commits 1-7, but not 8-10 + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000008", // ECTR - later than archival window + "00000010" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000011", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000011", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000011", cleanerPlan, cleanMetadata); + + // Reload metaClient + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsBeforeArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + // Trigger archival + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); + + // Reload timeline and get active commits + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); + int commitsAfterArchival = activeCommits.size(); + + // Verify archival made progress - some commits should have been archived + assertTrue(commitsAfterArchival < commitsBeforeArchival, + "Archival should make progress and archive commits before ECTR"); + + // Verify commits >= ECTR (00000008, 00000009, 00000010) are still present + List activeCommitTimes = activeCommits.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + + assertTrue(activeCommitTimes.contains("00000008"), + "Commit 00000008 (ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000009"), + "Commit 00000009 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000010"), + "Commit 00000010 (after ECTR) should not be archived"); + + // Verify earlier commits (before ECTR) were archived + // With 10 commits and ECTR at 8, archival should archive older commits + assertFalse(activeCommitTimes.contains("00000001"), + "Commit 00000001 (before ECTR and outside retention) should be archived"); + assertFalse(activeCommitTimes.contains("00000002"), + "Commit 00000002 (before ECTR and outside retention) should be archived"); + } + + /** + * Tests archival blocking based on ECTR with TimelineArchiverV2 (LSM-based timeline) and table version 9. + * This validates that the ECTR blocking feature works correctly with the newer archival implementation. + * + * Table version 9 uses LAYOUT_VERSION_2 which employs LSM-based timeline storage. + * This test ensures TimelineArchiverV2 properly reads ECTR from clean metadata and blocks archival. + */ + @Test + public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() throws Exception { + init(); + // Create config with ECTR blocking enabled + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 5 commits: 00000001, 00000002, 00000003, 00000004, 00000005 + for (int i = 1; i <= 5; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with ECTR pointing to commit 00000003 + // This means commits 00000003 and later should not be archived + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000005" + )); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p2", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000005" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000006", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000006", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000006", cleanerPlan, cleanMetadata); + + // Reload metaClient to pick up the clean commit + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Verify table version is 9 + assertEquals(HoodieTableVersion.NINE, metaClient.getTableConfig().getTableVersion(), + "Table should be version 9"); + + // Trigger archival using TimelineArchiverV2 + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + + // TimelineArchiverV2 should be used for version 9 + TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table); + archiver.archiveIfRequired(context); + + // Reload timeline and get active commits + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); + + // Verify that commits 00000003, 00000004, 00000005 are still in active timeline (not archived) + List activeCommitTimes = activeCommits.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + + // Key assertion: TimelineArchiverV2 should not respect ECTR and archive commits >= ECTR + assertFalse(activeCommitTimes.contains("00000003"), + "TimelineArchiverV2: Commit 00000003 (ECTR) should be archived"); + assertTrue(activeCommitTimes.contains("00000004"), + "TimelineArchiverV2: Commit 00000004 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000005"), + "TimelineArchiverV2: Commit 00000005 (after ECTR) should not be archived"); + + // Verify commits before ECTR may have been archived (00000001, 00000002) + // They could be archived or retained based on min/max archival settings + // But commits >= ECTR must be present - this is the critical check for TimelineArchiverV2 + log.info("TimelineArchiverV2 with table version 9's archival does not block on ECTR: {}", "00000003"); + } } From 112973acbb3cb2969f8f4e335bd605ee4070a4ea Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 10 Apr 2026 09:20:47 -0700 Subject: [PATCH 2/4] Minor log fix --- .../client/timeline/versioning/v1/TimelineArchiverV1.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java index 19813793f1be3..6b5fccdbcccb6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java @@ -289,8 +289,8 @@ private Stream getCommitInstantsToArchive() throws IOException { cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime()); } } catch (IOException e) { - log.warn("Failed to read clean metadata for {}, skipping ECTR check", lastCleanInstant.get(), e); - throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get() + ", skipping ECTR check", e); + log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e); + throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e); } } } From c33f5100a0a238b77c5674383170d6ac54ad9c80 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 15 Apr 2026 19:21:12 -0700 Subject: [PATCH 3/4] Addressing feedback in tests --- .../hudi/io/TestHoodieTimelineArchiver.java | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index cf77d25c09616..21254c2affaa5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -2375,13 +2375,22 @@ public void testArchivalProceedsNormallyWhenECTRBlockingDisabled() throws Except // Reload timeline metaClient = HoodieTableMetaClient.reload(metaClient); - int commitsAfterArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + List activeCommits = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants(); + List activeCommitTimes = activeCommits.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); // With config disabled, archival should proceed based on min/max archival commits (2, 3) - // Since we have 6 commits, archival should have kicked in - // This test verifies backward compatibility - ECTR from clean is ignored when config is off - assertTrue(commitsAfterArchival < commitsBeforeArchival || commitsAfterArchival <= 3, - "Archival should proceed normally when ECTR blocking is disabled"); + // ignoring ECTR entirely. The commits timeline includes 6 data commits + 1 clean commit = 7 total. + // With min=2, archival archives up to 7-2=5 instants, leaving only the last 2 (00000006 + clean 00000007). + // The key assertion is that commit 00000003 (the ECTR) IS archived, proving the flag is off. + assertFalse(activeCommitTimes.contains("00000003"), + "Commit 00000003 (ECTR) should be archived when ECTR blocking is disabled"); + assertTrue(activeCommitTimes.contains("00000006"), + "Commit 00000006 should be retained (within min commits to keep)"); + assertEquals(2, activeCommits.size(), + "Only 2 instants should remain active (min commits to keep)"); } /** @@ -2593,28 +2602,28 @@ public void testArchivalMakesProgressWhenECTRIsLaterThanArchivalWindow() throws List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); int commitsAfterArchival = activeCommits.size(); - // Verify archival made progress - some commits should have been archived - assertTrue(commitsAfterArchival < commitsBeforeArchival, - "Archival should make progress and archive commits before ECTR"); - - // Verify commits >= ECTR (00000008, 00000009, 00000010) are still present + // With 10 commits, min=2, max=3, and ECTR at 00000008: + // Archival can archive commits before ECTR, so 00000001-00000007 should be archived. + // Exactly 00000008-00000010 should remain active. List activeCommitTimes = activeCommits.stream() .map(HoodieInstant::requestedTime) .collect(Collectors.toList()); + // Verify all commits before ECTR are archived + for (int i = 1; i <= 7; i++) { + assertFalse(activeCommitTimes.contains(String.format("%08d", i)), + "Commit " + String.format("%08d", i) + " (before ECTR) should be archived"); + } + + // Verify commits >= ECTR are retained assertTrue(activeCommitTimes.contains("00000008"), "Commit 00000008 (ECTR) should not be archived"); assertTrue(activeCommitTimes.contains("00000009"), "Commit 00000009 (after ECTR) should not be archived"); assertTrue(activeCommitTimes.contains("00000010"), "Commit 00000010 (after ECTR) should not be archived"); - - // Verify earlier commits (before ECTR) were archived - // With 10 commits and ECTR at 8, archival should archive older commits - assertFalse(activeCommitTimes.contains("00000001"), - "Commit 00000001 (before ECTR and outside retention) should be archived"); - assertFalse(activeCommitTimes.contains("00000002"), - "Commit 00000002 (before ECTR and outside retention) should be archived"); + assertEquals(3, commitsAfterArchival, + "Exactly 3 commits (00000008-00000010) should remain active"); } /** From ea12e76bc5ce1a6ebbbf8209cb3efcec27f5ec2e Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 20 Apr 2026 17:14:29 -0700 Subject: [PATCH 4/4] Addressing feedback --- .../versioning/v1/TimelineArchiverV1.java | 7 +- .../hudi/config/HoodieArchivalConfig.java | 6 +- .../hudi/io/TestHoodieTimelineArchiver.java | 511 ++++-------------- 3 files changed, 108 insertions(+), 416 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java index 6b5fccdbcccb6..d518ac5525dd6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java @@ -20,6 +20,7 @@ package org.apache.hudi.client.timeline.versioning.v1; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.client.timeline.HoodieTimelineArchiver; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.ArchivalMetrics; @@ -279,14 +280,14 @@ private Stream getCommitInstantsToArchive() throws IOException { Option lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant(); if (lastCleanInstant.isPresent()) { try { - org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + HoodieCleanMetadata cleanMetadata = table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get()); if (cleanMetadata.getEarliestCommitToRetain() != null && !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) { oldestInstantToRetainForClean = commitTimeline.findInstantsAfterOrEquals( cleanMetadata.getEarliestCommitToRetain()).firstInstant(); - log.info("Blocking archival based on ECTR {} from last clean {}", - cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime()); + log.info("Blocking archival based on earliest commit to retain {} from last clean {}. Oldest to retain is {}", + cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime(), oldestInstantToRetainForClean.map(instant -> instant).orElse(null)); } } catch (IOException e) { log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java index a6a1f841ac857..8854c87edeaba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java @@ -119,7 +119,11 @@ public class HoodieArchivalConfig extends HoodieConfig { .defaultValue(false) .markAdvanced() .sinceVersion("1.2.0") - .withDocumentation("If enabled, archival will block on latest ECTR from last known clean"); + .withDocumentation("If enabled, archival will not archive commits beyond the Earliest Commit To Retain (ECTR) " + + "from the last completed clean. ECTR represents the oldest commit whose data files are still needed by " + + "the table and have not yet been cleaned up. Blocking archival at this point ensures that timeline metadata " + + "is not removed for commits whose data files still exist on storage, preventing inconsistencies between " + + "the timeline and the actual data."); /** * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 21254c2affaa5..60db8b8a13b75 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -18,6 +18,9 @@ package org.apache.hudi.io; +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; @@ -29,6 +32,7 @@ import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.utils.ArchivalMetrics; +import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -2208,342 +2212,134 @@ private void initTableToTestECTRBlock() throws IOException { } /** - * Tests archival blocking based on ECTR from last clean when config is enabled. - * Verifies that commits with timestamp >= ECTR are not archived. + * Tests archival behavior with ECTR blocking enabled vs disabled. + * When enabled: commits >= ECTR from last clean are not archived. + * When disabled: archival proceeds normally ignoring ECTR (backward compatible). + * Also validates that archival makes progress when ECTR is later than the archival window. */ - @Test - public void testArchivalBlocksOnCleanECTRWhenEnabled() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivalWithECTRBlocking(boolean blockArchivalOnCleanECTR) throws Exception { initTableToTestECTRBlock(); - // Create config with ECTR blocking enabled - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() - .withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2) - .withArchivalConfig(HoodieArchivalConfig.newBuilder() - .archiveCommitsWith(2, 3) - .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking - .build()) - .withCleanConfig(HoodieCleanConfig.newBuilder() - .retainCommits(1) - .build()) - .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withRemoteServerPort(timelineServicePort) - .build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(false) - .build()) - .forTable("test-trip-table") - .build(); - + HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, blockArchivalOnCleanECTR); HoodieTestTable testTable = HoodieTestTable.of(metaClient); - // Create 5 commits: 00000001, 00000002, 00000003, 00000004, 00000005 + // Given: 5 commits and a clean commit with ECTR pointing to commit 00000003 for (int i = 1; i <= 5; i++) { testTable.addCommit(String.format("%08d", i)); } - - // Create a clean commit with ECTR pointing to commit 00000003 - // This means commits 00000003 and later should not be archived - Map cleanStats = new HashMap<>(); - cleanStats.put("p1", 1); - cleanStats.put("p2", 1); - - List cleanStatsList = new ArrayList<>(); - cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( - HoodieCleaningPolicy.KEEP_LATEST_COMMITS, - "p1", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - "00000003", // ECTR - "00000005" - )); - cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( - HoodieCleaningPolicy.KEEP_LATEST_COMMITS, - "p2", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - "00000003", // ECTR - "00000005" - )); - - org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata("00000006", Option.of(0L), cleanStatsList, Collections.emptyMap()); - org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = - new org.apache.hudi.avro.model.HoodieCleanerPlan( - new org.apache.hudi.avro.model.HoodieActionInstant("00000003", CLEAN_ACTION, ""), - "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); - - testTable.addClean("00000006", cleanerPlan, cleanMetadata); - - // Reload metaClient to pick up the clean commit + addCleanCommitWithECTR(testTable, "00000006", "00000003", "00000005"); metaClient = HoodieTableMetaClient.reload(metaClient); - // Trigger archival + // When: trigger archival HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); archiver.archiveIfRequired(context); - // Reload timeline and get active commits + // Then: verify archival behavior metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); - - // Verify that commits 00000003, 00000004, 00000005 are still in active timeline (not archived) - List activeCommitTimes = activeCommits.stream() - .map(HoodieInstant::requestedTime) - .collect(Collectors.toList()); - - assertTrue(activeCommitTimes.contains("00000003"), "Commit 00000003 (ECTR) should not be archived"); - assertTrue(activeCommitTimes.contains("00000004"), "Commit 00000004 (after ECTR) should not be archived"); - assertTrue(activeCommitTimes.contains("00000005"), "Commit 00000005 (after ECTR) should not be archived"); - - // Verify commits before ECTR may have been archived (00000001, 00000002) - // They could be archived or retained based on min/max archival settings - // But commits >= ECTR must be present - - - } - - /** - * Tests that archival proceeds normally when config is disabled (default behavior). - * Verifies backward compatibility - existing behavior unchanged when ECTR blocking is off. - */ - @Test - public void testArchivalProceedsNormallyWhenECTRBlockingDisabled() throws Exception { - initTableToTestECTRBlock(); - - // Create config WITHOUT ECTR blocking (default is false) - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() - .withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2) - .withArchivalConfig(HoodieArchivalConfig.newBuilder() - .archiveCommitsWith(2, 3) - // Do not set withBlockArchivalOnCleanECTR - defaults to false - .build()) - .withCleanConfig(HoodieCleanConfig.newBuilder() - .retainCommits(1) - .build()) - .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withRemoteServerPort(timelineServicePort) - .build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(false) - .build()) - .forTable("test-trip-table") - .build(); - - HoodieTestTable testTable = HoodieTestTable.of(metaClient); + List activeCommitTimes = getActiveCommitTimes(); - // Create 6 commits: 00000001 through 00000006 - for (int i = 1; i <= 6; i++) { - testTable.addCommit(String.format("%08d", i)); + if (blockArchivalOnCleanECTR) { + // Commits >= ECTR should not be archived + assertTrue(activeCommitTimes.contains("00000003"), "Commit 00000003 (ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000004"), "Commit 00000004 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000005"), "Commit 00000005 (after ECTR) should not be archived"); + } else { + // ECTR is ignored; archival proceeds based on min/max archival commits only + assertFalse(activeCommitTimes.contains("00000003"), + "Commit 00000003 (ECTR) should be archived when ECTR blocking is disabled"); + assertTrue(activeCommitTimes.contains("00000005"), + "Commit 00000005 should be retained (within min commits to keep)"); } - // Create a clean commit with ECTR pointing to commit 00000003 - List cleanStatsList = new ArrayList<>(); - cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( - HoodieCleaningPolicy.KEEP_LATEST_COMMITS, - "p1", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - "00000003", // ECTR - "00000006" - )); - - org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata("00000007", Option.of(0L), cleanStatsList, Collections.emptyMap()); - org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = - new org.apache.hudi.avro.model.HoodieCleanerPlan( - new org.apache.hudi.avro.model.HoodieActionInstant("00000007", CLEAN_ACTION, ""), - "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); - - testTable.addClean("00000007", cleanerPlan, cleanMetadata); - - // Reload metaClient - metaClient = HoodieTableMetaClient.reload(metaClient); - int commitsBeforeArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + if (blockArchivalOnCleanECTR) { + // Additional step: validate archival makes progress when ECTR is later than the archival window. + // Add more commits and a new clean with ECTR at 00000008, so commits before ECTR can be archived. + for (int i = 7; i <= 10; i++) { + testTable.addCommit(String.format("%08d", i)); + } + addCleanCommitWithECTR(testTable, "00000011", "00000008", "00000010"); + metaClient = HoodieTableMetaClient.reload(metaClient); - // Trigger archival - HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); - archiver.archiveIfRequired(context); + table = HoodieSparkTable.create(writeConfig, context, metaClient); + archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); - // Reload timeline - metaClient = HoodieTableMetaClient.reload(metaClient); - List activeCommits = metaClient.getActiveTimeline().getCommitsTimeline() - .filterCompletedInstants().getInstants(); - List activeCommitTimes = activeCommits.stream() - .map(HoodieInstant::requestedTime) - .collect(Collectors.toList()); + metaClient = HoodieTableMetaClient.reload(metaClient); + activeCommitTimes = getActiveCommitTimes(); - // With config disabled, archival should proceed based on min/max archival commits (2, 3) - // ignoring ECTR entirely. The commits timeline includes 6 data commits + 1 clean commit = 7 total. - // With min=2, archival archives up to 7-2=5 instants, leaving only the last 2 (00000006 + clean 00000007). - // The key assertion is that commit 00000003 (the ECTR) IS archived, proving the flag is off. - assertFalse(activeCommitTimes.contains("00000003"), - "Commit 00000003 (ECTR) should be archived when ECTR blocking is disabled"); - assertTrue(activeCommitTimes.contains("00000006"), - "Commit 00000006 should be retained (within min commits to keep)"); - assertEquals(2, activeCommits.size(), - "Only 2 instants should remain active (min commits to keep)"); + // Commits before ECTR should be archived + for (int i = 1; i <= 7; i++) { + assertFalse(activeCommitTimes.contains(String.format("%08d", i)), + "Commit " + String.format("%08d", i) + " (before ECTR) should be archived"); + } + // Commits >= ECTR should be retained + assertTrue(activeCommitTimes.contains("00000008"), "Commit 00000008 (ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000009"), "Commit 00000009 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000010"), "Commit 00000010 (after ECTR) should not be archived"); + assertEquals(3, activeCommitTimes.size(), "Exactly 3 commits (00000008-00000010) should remain active"); + } } /** - * Tests graceful handling when clean metadata is missing or cannot be read. - * Verifies archival continues normally without blocking when metadata read fails. + * Tests graceful handling when clean metadata is missing or has empty ECTR. + * Archival should continue normally in both cases. */ @Test - public void testArchivalContinuesWhenCleanMetadataIsMissing() throws Exception { + public void testArchivalContinuesWhenECTRIsAbsent() throws Exception { initTableToTestECTRBlock(); - // Create config with ECTR blocking enabled - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() - .withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2) - .withArchivalConfig(HoodieArchivalConfig.newBuilder() - .archiveCommitsWith(2, 3) - .withBlockArchivalOnCleanECTR(true) - .build()) - .withCleanConfig(HoodieCleanConfig.newBuilder() - .retainCommits(1) - .build()) - .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withRemoteServerPort(timelineServicePort) - .build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(false) - .build()) - .forTable("test-trip-table") - .build(); - + HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, true); HoodieTestTable testTable = HoodieTestTable.of(metaClient); - // Create 6 commits + // Step 1: No clean commit exists — archival should proceed without error for (int i = 1; i <= 6; i++) { testTable.addCommit(String.format("%08d", i)); } - - // DO NOT create a clean commit - this simulates missing clean metadata - - // Reload metaClient metaClient = HoodieTableMetaClient.reload(metaClient); - // Trigger archival - should not fail even though no clean metadata exists HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); - // Should not throw exception - assertDoesNotThrow(() -> archiver.archiveIfRequired(context), + TimelineArchiverV1 finalArchiver = archiver; + assertDoesNotThrow(() -> finalArchiver.archiveIfRequired(context), "Archival should continue gracefully when clean metadata is missing"); - // Verify some archival happened (based on min/max archival commits) metaClient = HoodieTableMetaClient.reload(metaClient); - int commitsAfterArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); - - // With 6 commits and max_archival=3, some archival should occur - assertTrue(commitsAfterArchival <= 3, "Archival should proceed when clean metadata is missing"); - } + int commitsAfterFirstArchival = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().countInstants(); + assertTrue(commitsAfterFirstArchival <= 3, "Archival should proceed when clean metadata is missing"); - /** - * Tests that clean commits with empty or null ECTR are handled gracefully. - * Verifies archival proceeds normally when ECTR is not set in clean metadata. - */ - @Test - public void testArchivalHandlesEmptyECTRInCleanMetadata() throws Exception { - initTableToTestECTRBlock(); - - // Create config with ECTR blocking enabled - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() - .withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2) - .withArchivalConfig(HoodieArchivalConfig.newBuilder() - .archiveCommitsWith(2, 3) - .withBlockArchivalOnCleanECTR(true) - .build()) - .withCleanConfig(HoodieCleanConfig.newBuilder() - .retainCommits(1) - .build()) - .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withRemoteServerPort(timelineServicePort) - .build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(false) - .build()) - .forTable("test-trip-table") - .build(); - - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - - // Create 6 commits - for (int i = 1; i <= 6; i++) { + // Step 2: Clean commit exists but with empty ECTR — archival should still proceed + for (int i = 7; i <= 12; i++) { testTable.addCommit(String.format("%08d", i)); } - - // Create a clean commit with EMPTY ECTR - List cleanStatsList = new ArrayList<>(); - cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( - HoodieCleaningPolicy.KEEP_LATEST_COMMITS, - "p1", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - "", // Empty ECTR - "00000006" - )); - - org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata("00000007", Option.of(0L), cleanStatsList, Collections.emptyMap()); - org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = - new org.apache.hudi.avro.model.HoodieCleanerPlan( - new org.apache.hudi.avro.model.HoodieActionInstant("00000007", CLEAN_ACTION, ""), - "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); - - testTable.addClean("00000007", cleanerPlan, cleanMetadata); - - // Reload metaClient + addCleanCommitWithECTR(testTable, "00000013", "", "00000012"); metaClient = HoodieTableMetaClient.reload(metaClient); - // Trigger archival - should not fail with empty ECTR - HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + table = HoodieSparkTable.create(writeConfig, context, metaClient); + archiver = new TimelineArchiverV1(writeConfig, table); - assertDoesNotThrow(() -> archiver.archiveIfRequired(context), + TimelineArchiverV1 finalArchiver1 = archiver; + assertDoesNotThrow(() -> finalArchiver1.archiveIfRequired(context), "Archival should handle empty ECTR gracefully"); - // Verify archival proceeded normally metaClient = HoodieTableMetaClient.reload(metaClient); - int commitsAfterArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); - - assertTrue(commitsAfterArchival <= 3, "Archival should proceed normally with empty ECTR"); + int commitsAfterSecondArchival = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().countInstants(); + assertTrue(commitsAfterSecondArchival <= 3, "Archival should proceed normally with empty ECTR"); } - /** - * Tests that archival can still make progress when ECTR is later than the archival window. - * This verifies the feature doesn't unnecessarily block archival when it's safe to proceed. - * - * Scenario: Create 10 commits, with ECTR pointing to commit 8. Archival should be able to - * archive commits 1-7 since they are before ECTR and outside the retention window. - */ - @Test - public void testArchivalMakesProgressWhenECTRIsLaterThanArchivalWindow() throws Exception { - initTableToTestECTRBlock(); - - // Create config with ECTR blocking enabled - // Min commits to keep = 2, max commits to keep = 3 - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + private HoodieWriteConfig buildECTRTestConfig(int minCommits, int maxCommits, boolean blockArchivalOnCleanECTR) { + return HoodieWriteConfig.newBuilder() .withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) .withArchivalConfig(HoodieArchivalConfig.newBuilder() - .archiveCommitsWith(2, 3) - .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking + .archiveCommitsWith(minCommits, maxCommits) + .withBlockArchivalOnCleanECTR(blockArchivalOnCleanECTR) .build()) .withCleanConfig(HoodieCleanConfig.newBuilder() .retainCommits(1) @@ -2556,180 +2352,71 @@ public void testArchivalMakesProgressWhenECTRIsLaterThanArchivalWindow() throws .build()) .forTable("test-trip-table") .build(); + } - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - - // Create 10 commits: 00000001 through 00000010 - for (int i = 1; i <= 10; i++) { - testTable.addCommit(String.format("%08d", i)); - } - - // Create a clean commit with ECTR pointing to commit 00000008 - // This ECTR is later than what archival would retain based on min/max archival commits (2, 3) - // So archival should be able to archive commits 1-7, but not 8-10 - List cleanStatsList = new ArrayList<>(); - cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + private void addCleanCommitWithECTR(HoodieTestTable testTable, String cleanInstant, String ectr, String lastCompleted) throws Exception { + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new HoodieCleanStat( HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "p1", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), - "00000008", // ECTR - later than archival window - "00000010" + ectr, + lastCompleted )); - org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata("00000011", Option.of(0L), cleanStatsList, Collections.emptyMap()); - org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = - new org.apache.hudi.avro.model.HoodieCleanerPlan( - new org.apache.hudi.avro.model.HoodieActionInstant("00000011", CLEAN_ACTION, ""), + HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata(cleanInstant, Option.of(0L), cleanStatsList, Collections.emptyMap()); + HoodieCleanerPlan cleanerPlan = + new HoodieCleanerPlan( + new HoodieActionInstant(cleanInstant, CLEAN_ACTION, ""), "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); - testTable.addClean("00000011", cleanerPlan, cleanMetadata); - - // Reload metaClient - metaClient = HoodieTableMetaClient.reload(metaClient); - int commitsBeforeArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); - - // Trigger archival - HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); - archiver.archiveIfRequired(context); + testTable.addClean(cleanInstant, cleanerPlan, cleanMetadata); + } - // Reload timeline and get active commits - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); - int commitsAfterArchival = activeCommits.size(); - - // With 10 commits, min=2, max=3, and ECTR at 00000008: - // Archival can archive commits before ECTR, so 00000001-00000007 should be archived. - // Exactly 00000008-00000010 should remain active. - List activeCommitTimes = activeCommits.stream() + private List getActiveCommitTimes() { + return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().stream() .map(HoodieInstant::requestedTime) .collect(Collectors.toList()); - - // Verify all commits before ECTR are archived - for (int i = 1; i <= 7; i++) { - assertFalse(activeCommitTimes.contains(String.format("%08d", i)), - "Commit " + String.format("%08d", i) + " (before ECTR) should be archived"); - } - - // Verify commits >= ECTR are retained - assertTrue(activeCommitTimes.contains("00000008"), - "Commit 00000008 (ECTR) should not be archived"); - assertTrue(activeCommitTimes.contains("00000009"), - "Commit 00000009 (after ECTR) should not be archived"); - assertTrue(activeCommitTimes.contains("00000010"), - "Commit 00000010 (after ECTR) should not be archived"); - assertEquals(3, commitsAfterArchival, - "Exactly 3 commits (00000008-00000010) should remain active"); } /** - * Tests archival blocking based on ECTR with TimelineArchiverV2 (LSM-based timeline) and table version 9. - * This validates that the ECTR blocking feature works correctly with the newer archival implementation. - * - * Table version 9 uses LAYOUT_VERSION_2 which employs LSM-based timeline storage. - * This test ensures TimelineArchiverV2 properly reads ECTR from clean metadata and blocks archival. + * Tests that TimelineArchiverV2 (LSM-based timeline, v9 tables) does NOT block archival on ECTR. + * ECTR blocking is only for v6 tables using TimelineArchiverV1. */ @Test public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() throws Exception { init(); - // Create config with ECTR blocking enabled - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() - .withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2) - .withArchivalConfig(HoodieArchivalConfig.newBuilder() - .archiveCommitsWith(2, 3) - .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking - .build()) - .withCleanConfig(HoodieCleanConfig.newBuilder() - .retainCommits(1) - .build()) - .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withRemoteServerPort(timelineServicePort) - .build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(false) - .build()) - .forTable("test-trip-table") - .build(); + HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, true); HoodieTestTable testTable = HoodieTestTable.of(metaClient); - // Create 5 commits: 00000001, 00000002, 00000003, 00000004, 00000005 + // Given: 5 commits and a clean commit with ECTR at 00000003 for (int i = 1; i <= 5; i++) { testTable.addCommit(String.format("%08d", i)); } - - // Create a clean commit with ECTR pointing to commit 00000003 - // This means commits 00000003 and later should not be archived - List cleanStatsList = new ArrayList<>(); - cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( - HoodieCleaningPolicy.KEEP_LATEST_COMMITS, - "p1", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - "00000003", // ECTR - "00000005" - )); - cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( - HoodieCleaningPolicy.KEEP_LATEST_COMMITS, - "p2", - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - "00000003", // ECTR - "00000005" - )); - - org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata("00000006", Option.of(0L), cleanStatsList, Collections.emptyMap()); - org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = - new org.apache.hudi.avro.model.HoodieCleanerPlan( - new org.apache.hudi.avro.model.HoodieActionInstant("00000006", CLEAN_ACTION, ""), - "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); - - testTable.addClean("00000006", cleanerPlan, cleanMetadata); - - // Reload metaClient to pick up the clean commit + addCleanCommitWithECTR(testTable, "00000006", "00000003", "00000005"); metaClient = HoodieTableMetaClient.reload(metaClient); - // Verify table version is 9 assertEquals(HoodieTableVersion.NINE, metaClient.getTableConfig().getTableVersion(), "Table should be version 9"); - // Trigger archival using TimelineArchiverV2 + // When: trigger archival using TimelineArchiverV2 HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - - // TimelineArchiverV2 should be used for version 9 TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table); archiver.archiveIfRequired(context); - // Reload timeline and get active commits + // Then: TimelineArchiverV2 should NOT respect ECTR — commit 00000003 gets archived metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); - - // Verify that commits 00000003, 00000004, 00000005 are still in active timeline (not archived) - List activeCommitTimes = activeCommits.stream() - .map(HoodieInstant::requestedTime) - .collect(Collectors.toList()); + List activeCommitTimes = getActiveCommitTimes(); - // Key assertion: TimelineArchiverV2 should not respect ECTR and archive commits >= ECTR assertFalse(activeCommitTimes.contains("00000003"), "TimelineArchiverV2: Commit 00000003 (ECTR) should be archived"); assertTrue(activeCommitTimes.contains("00000004"), "TimelineArchiverV2: Commit 00000004 (after ECTR) should not be archived"); assertTrue(activeCommitTimes.contains("00000005"), "TimelineArchiverV2: Commit 00000005 (after ECTR) should not be archived"); - - // Verify commits before ECTR may have been archived (00000001, 00000002) - // They could be archived or retained based on min/max archival settings - // But commits >= ECTR must be present - this is the critical check for TimelineArchiverV2 - log.info("TimelineArchiverV2 with table version 9's archival does not block on ECTR: {}", "00000003"); } }