diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java index 874c5445cbd5f..ee090c880af4b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java @@ -250,8 +250,8 @@ public class HoodieCleanConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Maximum number of commits to clean in one clean commit. Applicable only when the clean policy is based on KEEP_LATEST_COMMITS or KEEP_LATEST_HOURS"); - public static final ConfigProperty MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS = ConfigProperty - .key("hoodie.write.empty.clean.internval.hours") + public static final ConfigProperty INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS = ConfigProperty + .key("hoodie.write.empty.clean.interval.hours") .defaultValue(-1L) .markAdvanced() .withDocumentation("In some cases empty clean commit needs to be created to ensure the clean planner " @@ -436,8 +436,8 @@ public HoodieCleanConfig.Builder withMaxCommitsToClean(long maxCommitsToClean) { return this; } - public HoodieCleanConfig.Builder withMaxIntervalToCreateEmptyCleanHours(long durationHours) { - cleanConfig.setValue(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS, String.valueOf(durationHours)); + public HoodieCleanConfig.Builder withIntervalToCreateEmptyCleanHours(long emptyCleanIntervalHours) { + cleanConfig.setValue(INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS, String.valueOf(emptyCleanIntervalHours)); return this; } @@ -449,9 +449,9 @@ public HoodieCleanConfig build() { if (maxCommitsToClean < 1) { throw new IllegalArgumentException(MAX_COMMITS_TO_CLEAN.key() + " must be >= 1, but was " + maxCommitsToClean); } - long maxDurationHours = cleanConfig.getLong(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS); - if (maxDurationHours == 0 || maxDurationHours < -1) { - throw new IllegalArgumentException(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS.key() + " must be >= 1, but was " + maxDurationHours); + long emptyCleanIntervalHours = cleanConfig.getLong(INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS); + if (emptyCleanIntervalHours == 0 || emptyCleanIntervalHours < -1) { + throw new IllegalArgumentException(INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS.key() + " must be -1 (disabled) or >= 1, but was " + emptyCleanIntervalHours); } return cleanConfig; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 569be10c12444..5df834121bf90 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1877,8 +1877,8 @@ public boolean isAutoClean() { return getBoolean(HoodieCleanConfig.AUTO_CLEAN); } - public long maxIntervalToCreateEmptyCleanHours() { - return getLong(HoodieCleanConfig.MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS); + public long getIntervalToCreateEmptyCleanHours() { + return getLong(HoodieCleanConfig.INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS); } public boolean shouldArchiveBeyondSavepoint() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 0868a85cabb93..d11c61d40c205 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -244,12 +244,12 @@ protected Option requestClean() { cleanPlanOpt = Option.of(cleanerPlan); } // If cleaner plan returned an empty list, incremental clean is enabled and there was no - // completed clean created in the last X hours configured in MAX_DURATION_TO_CREATE_EMPTY_CLEAN, + // completed clean created in the last X hours configured in INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS, // create a dummy clean to avoid full scan in the future. // Note: For a dataset with incremental clean enabled, that does not receive any updates, cleaner plan always comes // with an empty list of files to be cleaned. CleanActionExecutor would never be invoked for this dataset. // To avoid fullscan on the dataset with every ingestion run, empty clean commit is created here. - if (cleanPlanOpt.isEmpty() && config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.maxIntervalToCreateEmptyCleanHours() > 0) { + if (cleanPlanOpt.isEmpty() && config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.getIntervalToCreateEmptyCleanHours() > 0) { // Only create an empty clean commit if earliestInstantToRetain is present in the plan boolean eligibleForEmptyCleanCommit = true; @@ -260,7 +260,7 @@ protected Option requestClean() { ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(java.time.Instant.now(), table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId()); long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli(); long lastCleanTimeMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli(); - eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > (TimeUnit.HOURS.toMillis(config.maxIntervalToCreateEmptyCleanHours())); + eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > (TimeUnit.HOURS.toMillis(config.getIntervalToCreateEmptyCleanHours())); } catch (ParseException e) { log.error("Unable to parse last clean commit time", e); throw new HoodieException("Unable to parse last clean commit time", e); @@ -276,10 +276,10 @@ protected Option requestClean() { if (!StringUtils.isNullOrEmpty(previousEarliestCommitToRetain) && !StringUtils.isNullOrEmpty(currentEarliestCommitToRetain) && compareTimestamps(currentEarliestCommitToRetain, LESSER_THAN, previousEarliestCommitToRetain)) { - log.warn("Skipping empty clean creation because earliestCommitToRetain would go backwards. " + log.warn("Adjusting empty clean earliestCommitToRetain to previous value to avoid going backwards. " + "Previous: {}, Current: {}. This can happen when cleaner configuration is changed.", previousEarliestCommitToRetain, currentEarliestCommitToRetain); - return Option.empty(); + cleanerPlan.getEarliestInstantToRetain().setTimestamp(previousEarliestCommitToRetain); } } catch (IOException e) { log.error("Unable to read last clean metadata", e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index ce7070c285882..32e6af2180b5d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -833,7 +833,7 @@ void testEmptyCleansAddedAfterThreshold(boolean secondCommitAfterThreshold) thro .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2) - .withMaxIntervalToCreateEmptyCleanHours(1) + .withIntervalToCreateEmptyCleanHours(1) .build()) .build(); @@ -931,15 +931,18 @@ void testEmptyCleansAddedAfterThreshold(boolean secondCommitAfterThreshold) thro @Test void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception { - // Test that earliestCommitToRetain never goes backwards when user changes cleaner config + // Test that earliestCommitToRetain never goes backwards when user increases retention. + // Scenario: user starts with short retention (12h), then increases to long retention (72h). + // The longer retention would compute an older ECTR, but the code should adjust it to + // the previous ECTR and still create the empty clean. HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .withCleanConfig(HoodieCleanConfig.newBuilder() .withIncrementalCleaningMode(true) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .withCleanBootstrapBaseFileEnabled(false) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(24) - .withMaxIntervalToCreateEmptyCleanHours(1) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(12) + .withIntervalToCreateEmptyCleanHours(1) .build()) .build(); @@ -949,27 +952,27 @@ void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception { Instant instant = Instant.now(); ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, metaClient.getTableConfig().getTimelineTimezone().getZoneId()); - // Create first commit 48 hours ago + // Create first commit 70 hours ago String file1P0C0 = UUID.randomUUID().toString(); - String firstCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(48).toInstant())); + String firstCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(70).toInstant())); commitToTestTable(testTable, firstCommitTs, p0, file1P0C0); testTable = tearDownTestTableAndReinit(testTable, config); - // Create second commit 36 hours ago + // Create second commit 48 hours ago String file2P0C1 = UUID.randomUUID().toString(); - String secondCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(36).toInstant())); + String secondCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(48).toInstant())); commitToTestTable(testTable, secondCommitTs, p0, file2P0C1); testTable = tearDownTestTableAndReinit(testTable, config); metaClient = HoodieTableMetaClient.reload(metaClient); - // Create third commit 12 hours ago + // Create third commit 6 hours ago (well within 12h retention window) String file3P0C2 = UUID.randomUUID().toString(); - String thirdCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(12).toInstant())); + String thirdCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(6).toInstant())); commitToTestTable(testTable, thirdCommitTs, p0, file3P0C2); testTable = tearDownTestTableAndReinit(testTable, config); metaClient = HoodieTableMetaClient.reload(metaClient); - // Run first empty clean 2 hours ago - should retain commits from 26 hours ago (24h retention + 2h safety) + // Run first empty clean with 12h retention - ECTR should be thirdCommitTs (6h ago) String firstCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(2).toInstant())); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); List hoodieCleanStatsOne = runCleaner(config, false, false, writeClient, firstCleanInstant); @@ -982,34 +985,41 @@ void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception { String firstEarliestCommitToRetain = firstCleanMetadata.getEarliestCommitToRetain(); writeClient.close(); - // Now change config to retain only 12 hours (which would normally make earliestCommitToRetain go backwards) + // Add a new commit so that needsCleaning() passes for the second clean attempt + String file4P0C3 = UUID.randomUUID().toString(); + String fourthCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(1).toInstant())); + commitToTestTable(testTable, fourthCommitTs, p0, file4P0C3); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Now increase retention to 72 hours, which would make ECTR go backwards to firstCommitTs (70h ago) HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withPath(basePath) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .withCleanConfig(HoodieCleanConfig.newBuilder() .withIncrementalCleaningMode(true) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .withCleanBootstrapBaseFileEnabled(false) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(12) - .withMaxIntervalToCreateEmptyCleanHours(1) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(72) + .withIntervalToCreateEmptyCleanHours(1) .build()) .build(); - // Try to create another empty clean with the new config 61 minutes after first clean + // Create second empty clean 61 minutes after first clean String secondCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from( HoodieInstantTimeGenerator.parseDateFromInstantTime(firstCleanInstant).toInstant().plus(61, ChronoUnit.MINUTES))); writeClient = getHoodieWriteClient(newConfig); List hoodieCleanStatsTwo = runCleaner(newConfig, false, false, writeClient, secondCleanInstant); - // The clean should be skipped because earliestCommitToRetain would go backwards + // The empty clean should still be created, but with ECTR adjusted to the previous value metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimeline cleanTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); - assertEquals(1, cleanTimeline.countInstants(), "Second clean should be skipped to prevent earliestCommitToRetain from going backwards"); + assertEquals(2, cleanTimeline.countInstants(), "Second empty clean should be created with adjusted ECTR"); - // Verify earliestCommitToRetain did not change - HoodieCleanMetadata latestCleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, cleanTimeline.lastInstant().get()); - assertEquals(firstEarliestCommitToRetain, latestCleanMetadata.getEarliestCommitToRetain(), - "earliestCommitToRetain should not go backwards"); + // Verify earliestCommitToRetain did not go backwards + HoodieCleanMetadata secondCleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, cleanTimeline.lastInstant().get()); + assertEquals(firstEarliestCommitToRetain, secondCleanMetadata.getEarliestCommitToRetain(), + "earliestCommitToRetain should be adjusted to previous value, not go backwards"); writeClient.close(); } finally { testTable.close();