diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java index e693a1c2e7fcf..d518ac5525dd6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java @@ -20,6 +20,7 @@ package org.apache.hudi.client.timeline.versioning.v1; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.client.timeline.HoodieTimelineArchiver; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.ArchivalMetrics; @@ -53,6 +54,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -271,7 +273,31 @@ private Stream getCommitInstantsToArchive() throws IOException { Option oldestInstantToRetainForClustering = ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient(), config.getCleanerPolicy()); + // If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive + // commits that have data files that haven't been cleaned yet. + Option oldestInstantToRetainForClean = Option.empty(); + if (config.shouldBlockArchivalOnCleanECTR()) { + Option lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant(); + if (lastCleanInstant.isPresent()) { + try { + HoodieCleanMetadata cleanMetadata = + table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get()); + if (cleanMetadata.getEarliestCommitToRetain() != null + && !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) { + oldestInstantToRetainForClean = commitTimeline.findInstantsAfterOrEquals( + cleanMetadata.getEarliestCommitToRetain()).firstInstant(); + log.info("Blocking archival based on earliest commit to retain {} from last clean {}. Oldest to retain is {}", + cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime(), oldestInstantToRetainForClean.map(instant -> instant).orElse(null)); + } + } catch (IOException e) { + log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e); + throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e); + } + } + } + // Actually do the commits + Option finalOldestInstantToRetainForClean = oldestInstantToRetainForClean; Stream instantToArchiveStream = commitTimeline.getInstantsAsStream() .filter(s -> { if (config.shouldArchiveBeyondSavepoint()) { @@ -297,6 +323,10 @@ private Stream getCommitInstantsToArchive() throws IOException { oldestInstantToRetainForClustering.map(instantToRetain -> compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime())) .orElse(true) + ).filter(s -> + finalOldestInstantToRetainForClean.map(instantToRetain -> + compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime())) + .orElse(true) ); return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep); } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java index 1b1e3ba5c3917..8854c87edeaba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java @@ -114,6 +114,17 @@ public class HoodieArchivalConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Number of timeline manifest versions to retain."); + public static final ConfigProperty BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR = ConfigProperty + .key("hoodie.archive.block.on.latest.clean.ectr") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("If enabled, archival will not archive commits beyond the Earliest Commit To Retain (ECTR) " + + "from the last completed clean. ECTR represents the oldest commit whose data files are still needed by " + + "the table and have not yet been cleaned up. Blocking archival at this point ensures that timeline metadata " + + "is not removed for commits whose data files still exist on storage, preventing inconsistencies between " + + "the timeline and the actual data."); + /** * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */ @@ -205,6 +216,11 @@ public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) { return this; } + public Builder withBlockArchivalOnCleanECTR(boolean blockArchivalOnCleanECTR) { + archivalConfig.setValue(BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR, String.valueOf(blockArchivalOnCleanECTR)); + return this; + } + public HoodieArchivalConfig build() { archivalConfig.setDefaults(HoodieArchivalConfig.class.getName()); return archivalConfig; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9d32ec8b7d22d..5fc5ba5406ca6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1975,6 +1975,10 @@ public int getCommitArchivalBatchSize() { return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE); } + public boolean shouldBlockArchivalOnCleanECTR() { + return getBoolean(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR); + } + public Boolean shouldCleanBootstrapBaseFile() { return getBoolean(HoodieCleanConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index c631ec00626c1..60db8b8a13b75 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -18,16 +18,21 @@ package org.apache.hudi.io; +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.timeline.TimelineArchivers; import org.apache.hudi.client.WriteClientTestUtils; +import org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1; import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter; import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.utils.ArchivalMetrics; +import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -47,6 +52,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.LSMTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler; import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.FileCreateUtilsLegacy; @@ -103,6 +109,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -2189,4 +2196,227 @@ public void testArchivalMetricsWithMixedActionTypes() throws Exception { // Verify archival status is success assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_STATUS), "Archival should succeed"); } + + private void initTableToTestECTRBlock() throws IOException { + HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; + initPath(); + initSparkContexts(); + initTimelineService(); + Properties properties = new Properties(); + properties.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "6"); + properties.setProperty(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key(), "false"); + initMetaClient(properties); + storage = metaClient.getStorage(); + metaClient.getStorage().createDirectory(new StoragePath(basePath)); + metaClient = HoodieTestUtils.init(storageConf, basePath, tableType, properties); + } + + /** + * Tests archival behavior with ECTR blocking enabled vs disabled. + * When enabled: commits >= ECTR from last clean are not archived. + * When disabled: archival proceeds normally ignoring ECTR (backward compatible). + * Also validates that archival makes progress when ECTR is later than the archival window. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivalWithECTRBlocking(boolean blockArchivalOnCleanECTR) throws Exception { + initTableToTestECTRBlock(); + + HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, blockArchivalOnCleanECTR); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Given: 5 commits and a clean commit with ECTR pointing to commit 00000003 + for (int i = 1; i <= 5; i++) { + testTable.addCommit(String.format("%08d", i)); + } + addCleanCommitWithECTR(testTable, "00000006", "00000003", "00000005"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // When: trigger archival + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); + + // Then: verify archival behavior + metaClient = HoodieTableMetaClient.reload(metaClient); + List activeCommitTimes = getActiveCommitTimes(); + + if (blockArchivalOnCleanECTR) { + // Commits >= ECTR should not be archived + assertTrue(activeCommitTimes.contains("00000003"), "Commit 00000003 (ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000004"), "Commit 00000004 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000005"), "Commit 00000005 (after ECTR) should not be archived"); + } else { + // ECTR is ignored; archival proceeds based on min/max archival commits only + assertFalse(activeCommitTimes.contains("00000003"), + "Commit 00000003 (ECTR) should be archived when ECTR blocking is disabled"); + assertTrue(activeCommitTimes.contains("00000005"), + "Commit 00000005 should be retained (within min commits to keep)"); + } + + if (blockArchivalOnCleanECTR) { + // Additional step: validate archival makes progress when ECTR is later than the archival window. + // Add more commits and a new clean with ECTR at 00000008, so commits before ECTR can be archived. + for (int i = 7; i <= 10; i++) { + testTable.addCommit(String.format("%08d", i)); + } + addCleanCommitWithECTR(testTable, "00000011", "00000008", "00000010"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + table = HoodieSparkTable.create(writeConfig, context, metaClient); + archiver = new TimelineArchiverV1(writeConfig, table); + archiver.archiveIfRequired(context); + + metaClient = HoodieTableMetaClient.reload(metaClient); + activeCommitTimes = getActiveCommitTimes(); + + // Commits before ECTR should be archived + for (int i = 1; i <= 7; i++) { + assertFalse(activeCommitTimes.contains(String.format("%08d", i)), + "Commit " + String.format("%08d", i) + " (before ECTR) should be archived"); + } + // Commits >= ECTR should be retained + assertTrue(activeCommitTimes.contains("00000008"), "Commit 00000008 (ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000009"), "Commit 00000009 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000010"), "Commit 00000010 (after ECTR) should not be archived"); + assertEquals(3, activeCommitTimes.size(), "Exactly 3 commits (00000008-00000010) should remain active"); + } + } + + /** + * Tests graceful handling when clean metadata is missing or has empty ECTR. + * Archival should continue normally in both cases. + */ + @Test + public void testArchivalContinuesWhenECTRIsAbsent() throws Exception { + initTableToTestECTRBlock(); + + HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, true); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Step 1: No clean commit exists — archival should proceed without error + for (int i = 1; i <= 6; i++) { + testTable.addCommit(String.format("%08d", i)); + } + metaClient = HoodieTableMetaClient.reload(metaClient); + + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table); + + TimelineArchiverV1 finalArchiver = archiver; + assertDoesNotThrow(() -> finalArchiver.archiveIfRequired(context), + "Archival should continue gracefully when clean metadata is missing"); + + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsAfterFirstArchival = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().countInstants(); + assertTrue(commitsAfterFirstArchival <= 3, "Archival should proceed when clean metadata is missing"); + + // Step 2: Clean commit exists but with empty ECTR — archival should still proceed + for (int i = 7; i <= 12; i++) { + testTable.addCommit(String.format("%08d", i)); + } + addCleanCommitWithECTR(testTable, "00000013", "", "00000012"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + table = HoodieSparkTable.create(writeConfig, context, metaClient); + archiver = new TimelineArchiverV1(writeConfig, table); + + TimelineArchiverV1 finalArchiver1 = archiver; + assertDoesNotThrow(() -> finalArchiver1.archiveIfRequired(context), + "Archival should handle empty ECTR gracefully"); + + metaClient = HoodieTableMetaClient.reload(metaClient); + int commitsAfterSecondArchival = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().countInstants(); + assertTrue(commitsAfterSecondArchival <= 3, "Archival should proceed normally with empty ECTR"); + } + + private HoodieWriteConfig buildECTRTestConfig(int minCommits, int maxCommits, boolean blockArchivalOnCleanECTR) { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(minCommits, maxCommits) + .withBlockArchivalOnCleanECTR(blockArchivalOnCleanECTR) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(1) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .forTable("test-trip-table") + .build(); + } + + private void addCleanCommitWithECTR(HoodieTestTable testTable, String cleanInstant, String ectr, String lastCompleted) throws Exception { + List cleanStatsList = new ArrayList<>(); + cleanStatsList.add(new HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + "p1", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + ectr, + lastCompleted + )); + + HoodieCleanMetadata cleanMetadata = + CleanerUtils.convertCleanMetadata(cleanInstant, Option.of(0L), cleanStatsList, Collections.emptyMap()); + HoodieCleanerPlan cleanerPlan = + new HoodieCleanerPlan( + new HoodieActionInstant(cleanInstant, CLEAN_ACTION, ""), + "", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap()); + + testTable.addClean(cleanInstant, cleanerPlan, cleanMetadata); + } + + private List getActiveCommitTimes() { + return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + } + + /** + * Tests that TimelineArchiverV2 (LSM-based timeline, v9 tables) does NOT block archival on ECTR. + * ECTR blocking is only for v6 tables using TimelineArchiverV1. + */ + @Test + public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() throws Exception { + init(); + + HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, true); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + + // Given: 5 commits and a clean commit with ECTR at 00000003 + for (int i = 1; i <= 5; i++) { + testTable.addCommit(String.format("%08d", i)); + } + addCleanCommitWithECTR(testTable, "00000006", "00000003", "00000005"); + metaClient = HoodieTableMetaClient.reload(metaClient); + + assertEquals(HoodieTableVersion.NINE, metaClient.getTableConfig().getTableVersion(), + "Table should be version 9"); + + // When: trigger archival using TimelineArchiverV2 + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table); + archiver.archiveIfRequired(context); + + // Then: TimelineArchiverV2 should NOT respect ECTR — commit 00000003 gets archived + metaClient = HoodieTableMetaClient.reload(metaClient); + List activeCommitTimes = getActiveCommitTimes(); + + assertFalse(activeCommitTimes.contains("00000003"), + "TimelineArchiverV2: Commit 00000003 (ECTR) should be archived"); + assertTrue(activeCommitTimes.contains("00000004"), + "TimelineArchiverV2: Commit 00000004 (after ECTR) should not be archived"); + assertTrue(activeCommitTimes.contains("00000005"), + "TimelineArchiverV2: Commit 00000005 (after ECTR) should not be archived"); + } }