diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java index e693a1c2e7fcf..6b5fccdbcccb6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java @@ -53,6 +53,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -271,7 +272,31 @@ private Stream getCommitInstantsToArchive() throws IOException { Option oldestInstantToRetainForClustering = ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient(), config.getCleanerPolicy()); + // If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive + // commits that have data files that haven't been cleaned yet. + Option oldestInstantToRetainForClean = Option.empty(); + if (config.shouldBlockArchivalOnCleanECTR()) { + Option lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant(); + if (lastCleanInstant.isPresent()) { + try { + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get()); + if (cleanMetadata.getEarliestCommitToRetain() != null + && !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) { + oldestInstantToRetainForClean = commitTimeline.findInstantsAfterOrEquals( + cleanMetadata.getEarliestCommitToRetain()).firstInstant(); + log.info("Blocking archival based on ECTR {} from last clean {}", + cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime()); + } + } catch (IOException e) { + log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e); + throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e); + } + } + } + // Actually do the commits + Option finalOldestInstantToRetainForClean = oldestInstantToRetainForClean; Stream instantToArchiveStream = commitTimeline.getInstantsAsStream() .filter(s -> { if (config.shouldArchiveBeyondSavepoint()) { @@ -297,6 +322,10 @@ private Stream getCommitInstantsToArchive() throws IOException { oldestInstantToRetainForClustering.map(instantToRetain -> compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime())) .orElse(true) + ).filter(s -> + finalOldestInstantToRetainForClean.map(instantToRetain -> + compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime())) + .orElse(true) ); return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep); } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java index 1b1e3ba5c3917..a6a1f841ac857 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java @@ -114,6 +114,13 @@ public class HoodieArchivalConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Number of timeline manifest versions to retain."); + public static final ConfigProperty BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR = ConfigProperty + .key("hoodie.archive.block.on.latest.clean.ectr") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("If enabled, archival will block on latest ECTR from last known clean"); + /** * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */ @@ -205,6 +212,11 @@ public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) { return this; } + public Builder withBlockArchivalOnCleanECTR(boolean blockArchivalOnCleanECTR) { + archivalConfig.setValue(BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR, String.valueOf(blockArchivalOnCleanECTR)); + return this; + } + public HoodieArchivalConfig build() { archivalConfig.setDefaults(HoodieArchivalConfig.class.getName()); return archivalConfig; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9d32ec8b7d22d..5fc5ba5406ca6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1975,6 +1975,10 @@ public int getCommitArchivalBatchSize() { return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE); } + public boolean shouldBlockArchivalOnCleanECTR() { + return getBoolean(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR); + } + public Boolean shouldCleanBootstrapBaseFile() { return getBoolean(HoodieCleanConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index c631ec00626c1..21254c2affaa5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.timeline.TimelineArchivers; import org.apache.hudi.client.WriteClientTestUtils; +import org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1; import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter; import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; @@ -47,6 +48,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.LSMTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler; import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.FileCreateUtilsLegacy; @@ -103,6 +105,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -2189,4 +2192,544 @@ public void testArchivalMetricsWithMixedActionTypes() throws Exception { // Verify archival status is success assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_STATUS), "Archival should succeed"); } + + private void initTableToTestECTRBlock() throws IOException { + HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; + initPath(); + initSparkContexts(); + initTimelineService(); + Properties properties = new Properties(); + properties.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "6"); + properties.setProperty(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key(), "false"); + initMetaClient(properties); + storage = metaClient.getStorage(); + metaClient.getStorage().createDirectory(new StoragePath(basePath)); + metaClient = HoodieTestUtils.init(storageConf, basePath, tableType, properties); + } + + /** + * Tests archival blocking based on ECTR from last clean when config is enabled. + * Verifies that commits with timestamp >= ECTR are not archived. + */ + @Test + public void testArchivalBlocksOnCleanECTRWhenEnabled() throws Exception { + initTableToTestECTRBlock(); + + // Create config with ECTR blocking enabled + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 5 commits: 00000001, 00000002, 00000003, 00000004, 00000005 + for (int i = 1; i <= 5; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with ECTR pointing to commit 00000003 + // This means commits 00000003 and later should not be archived + Map cleanStats = new HashMap<>(); + cleanStats.put("p1", 1); + cleanStats.put("p2", 1); + + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000005" + )); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p2", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000005" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000006", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000003", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000006", cleanerPlan, cleanMetadata); + + // Reload metaClient to pick up the clean commit + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Trigger archival + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); + + // Reload timeline and get active commits + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); + + // Verify that commits 00000003, 00000004, 00000005 are still in active timeline (not archived) + List activeCommitTimes = activeCommits.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + + assertTrue(activeCommitTimes.contains("00000003"), "Commit 00000003 (ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000004"), "Commit 00000004 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000005"), "Commit 00000005 (after ECTR) should not be archived"); + + // Verify commits before ECTR may have been archived (00000001, 00000002) + // They could be archived or retained based on min/max archival settings + // But commits >= ECTR must be present + + + } + + /** + * Tests that archival proceeds normally when config is disabled (default behavior). + * Verifies backward compatibility - existing behavior unchanged when ECTR blocking is off. + */ + @Test + public void testArchivalProceedsNormallyWhenECTRBlockingDisabled() throws Exception { + initTableToTestECTRBlock(); + + // Create config WITHOUT ECTR blocking (default is false) + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + // Do not set withBlockArchivalOnCleanECTR - defaults to false + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 6 commits: 00000001 through 00000006 + for (int i = 1; i <= 6; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with ECTR pointing to commit 00000003 + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000006" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000007", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000007", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000007", cleanerPlan, cleanMetadata); + + // Reload metaClient + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsBeforeArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + // Trigger archival + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); + + // Reload timeline + metaClient = HoodieTableMetaClient.reload(metaClient); + List activeCommits = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants(); + List activeCommitTimes = activeCommits.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + + // With config disabled, archival should proceed based on min/max archival commits (2, 3) + // ignoring ECTR entirely. The commits timeline includes 6 data commits + 1 clean commit = 7 total. + // With min=2, archival archives up to 7-2=5 instants, leaving only the last 2 (00000006 + clean 00000007). + // The key assertion is that commit 00000003 (the ECTR) IS archived, proving the flag is off. + assertFalse(activeCommitTimes.contains("00000003"), + "Commit 00000003 (ECTR) should be archived when ECTR blocking is disabled"); + assertTrue(activeCommitTimes.contains("00000006"), + "Commit 00000006 should be retained (within min commits to keep)"); + assertEquals(2, activeCommits.size(), + "Only 2 instants should remain active (min commits to keep)"); + } + + /** + * Tests graceful handling when clean metadata is missing or cannot be read. + * Verifies archival continues normally without blocking when metadata read fails. + */ + @Test + public void testArchivalContinuesWhenCleanMetadataIsMissing() throws Exception { + initTableToTestECTRBlock(); + + // Create config with ECTR blocking enabled + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 6 commits + for (int i = 1; i <= 6; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // DO NOT create a clean commit - this simulates missing clean metadata + + // Reload metaClient + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Trigger archival - should not fail even though no clean metadata exists + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + + // Should not throw exception + assertDoesNotThrow(() -> archiver.archiveIfRequired(context), + "Archival should continue gracefully when clean metadata is missing"); + + // Verify some archival happened (based on min/max archival commits) + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsAfterArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + // With 6 commits and max_archival=3, some archival should occur + assertTrue(commitsAfterArchival <= 3, "Archival should proceed when clean metadata is missing"); + } + + /** + * Tests that clean commits with empty or null ECTR are handled gracefully. + * Verifies archival proceeds normally when ECTR is not set in clean metadata. + */ + @Test + public void testArchivalHandlesEmptyECTRInCleanMetadata() throws Exception { + initTableToTestECTRBlock(); + + // Create config with ECTR blocking enabled + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 6 commits + for (int i = 1; i <= 6; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with EMPTY ECTR + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "", // Empty ECTR + "00000006" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000007", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000007", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000007", cleanerPlan, cleanMetadata); + + // Reload metaClient + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Trigger archival - should not fail with empty ECTR + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + + assertDoesNotThrow(() -> archiver.archiveIfRequired(context), + "Archival should handle empty ECTR gracefully"); + + // Verify archival proceeded normally + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsAfterArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + assertTrue(commitsAfterArchival <= 3, "Archival should proceed normally with empty ECTR"); + } + + /** + * Tests that archival can still make progress when ECTR is later than the archival window. + * This verifies the feature doesn't unnecessarily block archival when it's safe to proceed. + * + * Scenario: Create 10 commits, with ECTR pointing to commit 8. Archival should be able to + * archive commits 1-7 since they are before ECTR and outside the retention window. + */ + @Test + public void testArchivalMakesProgressWhenECTRIsLaterThanArchivalWindow() throws Exception { + initTableToTestECTRBlock(); + + // Create config with ECTR blocking enabled + // Min commits to keep = 2, max commits to keep = 3 + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 10 commits: 00000001 through 00000010 + for (int i = 1; i <= 10; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with ECTR pointing to commit 00000008 + // This ECTR is later than what archival would retain based on min/max archival commits (2, 3) + // So archival should be able to archive commits 1-7, but not 8-10 + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000008", // ECTR - later than archival window + "00000010" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000011", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000011", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000011", cleanerPlan, cleanMetadata); + + // Reload metaClient + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsBeforeArchival = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); + + // Trigger archival + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); + + // Reload timeline and get active commits + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); + int commitsAfterArchival = activeCommits.size(); + + // With 10 commits, min=2, max=3, and ECTR at 00000008: + // Archival can archive commits before ECTR, so 00000001-00000007 should be archived. + // Exactly 00000008-00000010 should remain active. + List activeCommitTimes = activeCommits.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + + // Verify all commits before ECTR are archived + for (int i = 1; i <= 7; i++) { + assertFalse(activeCommitTimes.contains(String.format("%08d", i)), + "Commit " + String.format("%08d", i) + " (before ECTR) should be archived"); + } + + // Verify commits >= ECTR are retained + assertTrue(activeCommitTimes.contains("00000008"), + "Commit 00000008 (ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000009"), + "Commit 00000009 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000010"), + "Commit 00000010 (after ECTR) should not be archived"); + assertEquals(3, commitsAfterArchival, + "Exactly 3 commits (00000008-00000010) should remain active"); + } + + /** + * Tests archival blocking based on ECTR with TimelineArchiverV2 (LSM-based timeline) and table version 9. + * This validates that the ECTR blocking feature works correctly with the newer archival implementation. + * + * Table version 9 uses LAYOUT_VERSION_2 which employs LSM-based timeline storage. + * This test ensures TimelineArchiverV2 properly reads ECTR from clean metadata and blocks archival. + */ + @Test + public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() throws Exception { + init(); + // Create config with ECTR blocking enabled + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3) + .withBlockArchivalOnCleanECTR(true) // Enable ECTR blocking + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Create 5 commits: 00000001, 00000002, 00000003, 00000004, 00000005 + for (int i = 1; i <= 5; i++) { + testTable.addCommit(String.format("%08d", i)); + } + + // Create a clean commit with ECTR pointing to commit 00000003 + // This means commits 00000003 and later should not be archived + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000005" + )); + cleanStatsList.add(new org.apache.hudi.common.HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p2", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + "00000003", // ECTR + "00000005" + )); + + org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata("00000006", Option.of(0L), cleanStatsList, Collections.emptyMap()); + org.apache.hudi.avro.model.HoodieCleanerPlan cleanerPlan = + new org.apache.hudi.avro.model.HoodieCleanerPlan( + new org.apache.hudi.avro.model.HoodieActionInstant("00000006", CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean("00000006", cleanerPlan, cleanMetadata); + + // Reload metaClient to pick up the clean commit + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Verify table version is 9 + assertEquals(HoodieTableVersion.NINE, metaClient.getTableConfig().getTableVersion(), + "Table should be version 9"); + + // Trigger archival using TimelineArchiverV2 + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + + // TimelineArchiverV2 should be used for version 9 + TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table); + archiver.archiveIfRequired(context); + + // Reload timeline and get active commits + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + List activeCommits = activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); + + // Verify that commits 00000003, 00000004, 00000005 are still in active timeline (not archived) + List activeCommitTimes = activeCommits.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + + // Key assertion: TimelineArchiverV2 should not respect ECTR and archive commits >= ECTR + assertFalse(activeCommitTimes.contains("00000003"), + "TimelineArchiverV2: Commit 00000003 (ECTR) should be archived"); + assertTrue(activeCommitTimes.contains("00000004"), + "TimelineArchiverV2: Commit 00000004 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000005"), + "TimelineArchiverV2: Commit 00000005 (after ECTR) should not be archived"); + + // Verify commits before ECTR may have been archived (00000001, 00000002) + // They could be archived or retained based on min/max archival settings + // But commits >= ECTR must be present - this is the critical check for TimelineArchiverV2 + log.info("TimelineArchiverV2 with table version 9's archival does not block on ECTR: {}", "00000003"); + } }