From 5208c767a40c257b62c7ee820aa1bcd714be40e3 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 17 Mar 2026 15:24:29 -0700 Subject: [PATCH 1/3] Adding empty clean support to hudi --- .../apache/hudi/config/HoodieCleanConfig.java | 19 ++ .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../action/clean/CleanActionExecutor.java | 42 +++- .../action/clean/CleanPlanActionExecutor.java | 84 ++++++- .../hudi/table/action/TestCleanPlanner.java | 12 +- .../org/apache/hudi/table/TestCleaner.java | 63 +++++- .../functional/TestCleanPlanExecutor.java | 212 ++++++++++++++++++ .../hudi/testutils/HoodieCleanerTestBase.java | 17 +- 8 files changed, 421 insertions(+), 32 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java index 0565f8f9f44cb..77ffa7c1f91d3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.table.action.clean.CleaningTriggerStrategy; import javax.annotation.concurrent.Immutable; @@ -250,6 +251,16 @@ public class HoodieCleanConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Maximum number of commits to clean in one clean commit. Applicable only when the clean policy is based on KEEP_LATEST_COMMITS or KEEP_LATEST_HOURS"); + public static final ConfigProperty MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS = ConfigProperty + .key("hoodie.write.empty.clean.create.duration.ms") + .defaultValue(-1L) + .markAdvanced() + .withDocumentation("In some cases empty clean commit needs to be created to ensure the clean planner " + + "does not look through entire dataset if there are no clean plans. This is possible for append-only " + + "dataset. Also, for these datasets we cannot ignore clean completely since in the future there could " + + "be upsert or replace operations. By creating empty clean commit, earliest_commit_to_retain value " + + "will be updated so that now clean planner can only check for partitions that are modified after the " + + "last empty clean's earliest_commit_toRetain value there by optimizing the clean planning"); /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */ @Deprecated @@ -426,6 +437,11 @@ public HoodieCleanConfig.Builder withMaxCommitsToClean(long maxCommitsToClean) { return this; } + public HoodieCleanConfig.Builder withMaxDurationToCreateEmptyClean(long duration) { + cleanConfig.setValue(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, String.valueOf(duration)); + return this; + } + public HoodieCleanConfig build() { cleanConfig.setDefaults(HoodieCleanConfig.class.getName()); HoodieCleaningPolicy.valueOf(cleanConfig.getString(CLEANER_POLICY)); @@ -434,6 +450,9 @@ public HoodieCleanConfig build() { if (maxCommitsToClean < 1) { throw new IllegalArgumentException(MAX_COMMITS_TO_CLEAN.key() + " must be >= 1, but was " + maxCommitsToClean); } + long maxDuration = cleanConfig.getLong(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); + ValidationUtils.checkArgument(maxDuration != 0, + MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be either -1 (disabled) or a positive value, but got 0"); return cleanConfig; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index c0c5be59e3c2f..9b08d26141908 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1842,6 +1842,10 @@ public boolean isAutoClean() { return getBoolean(HoodieCleanConfig.AUTO_CLEAN); } + public long maxDurationToCreateEmptyCleanMs() { + return getLong(HoodieCleanConfig.MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); + } + public boolean shouldArchiveBeyondSavepoint() { return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 64262ab673a29..ce6c6af2b8ee4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -45,6 +45,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -52,6 +53,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; @Slf4j @@ -134,9 +136,9 @@ private static Stream> deleteFilesFunc(Iterator * @throws IllegalArgumentException if unknown cleaning policy is provided */ List clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { - int cleanerParallelism = Math.min( + int cleanerParallelism = Math.max(1, Math.min( cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(), - config.getCleanerParallelism()); + config.getCleanerParallelism())); log.info("Using cleanerParallelism: {}", cleanerParallelism); context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName()); @@ -155,7 +157,7 @@ List clean(HoodieEngineContext context, HoodieCleanerPlan clean List partitionsToBeDeleted = table.getMetaClient().getTableConfig().isTablePartitioned() && cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() - : new ArrayList<>(); + : Collections.emptyList(); partitionsToBeDeleted.forEach(entry -> { if (!isNullOrEmpty(entry)) { deleteFileAndGetResult(table.getStorage(), table.getMetaClient().getBasePath() + "/" + entry); @@ -213,17 +215,18 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstan } List cleanStats = clean(context, cleanerPlan); + table.getMetaClient().reloadActiveTimeline(); + HoodieCleanMetadata metadata; if (cleanStats.isEmpty()) { - return HoodieCleanMetadata.newBuilder().build(); + metadata = createEmptyCleanMetadata(cleanerPlan, inflightInstant, timer.endTimer()); + } else { + metadata = CleanerUtils.convertCleanMetadata( + inflightInstant.requestedTime(), + Option.of(timer.endTimer()), + cleanStats, + cleanerPlan.getExtraMetadata() + ); } - - table.getMetaClient().reloadActiveTimeline(); - HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata( - inflightInstant.requestedTime(), - Option.of(timer.endTimer()), - cleanStats, - cleanerPlan.getExtraMetadata() - ); this.txnManager.beginStateChange(Option.of(inflightInstant), Option.empty()); writeTableMetadata(metadata, inflightInstant.requestedTime()); table.getActiveTimeline().transitionCleanInflightToComplete( @@ -238,6 +241,21 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstan } } + private static HoodieCleanMetadata createEmptyCleanMetadata(HoodieCleanerPlan cleanerPlan, HoodieInstant inflightInstant, long timeTakenMillis) { + ValidationUtils.checkArgument(cleanerPlan.getEarliestInstantToRetain() != null, "For empty cleans, earliest instant to retain can never be null"); + return HoodieCleanMetadata.newBuilder() + .setStartCleanTime(inflightInstant.requestedTime()) + .setTimeTakenInMillis(timeTakenMillis) + .setTotalFilesDeleted(0) + .setLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp()) + .setEarliestCommitToRetain(cleanerPlan.getEarliestInstantToRetain().getTimestamp()) + .setVersion(CLEAN_METADATA_VERSION_2) + .setPartitionMetadata(Collections.emptyMap()) + .setExtraMetadata(cleanerPlan.getExtraMetadata()) + .setBootstrapPartitionMetadata(Collections.emptyMap()) + .build(); + } + @Override public HoodieCleanMetadata execute() { List cleanMetadataList = new ArrayList<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 4c57cdd76b59a..06dba1f025f3e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -28,9 +28,11 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -42,6 +44,8 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.text.ParseException; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -49,6 +53,8 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; +import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; import static org.apache.hudi.common.util.CleanerUtils.SAVEPOINTED_TIMESTAMPS; import static org.apache.hudi.common.util.MapUtils.nonEmpty; @@ -94,6 +100,23 @@ private boolean needsCleaning(CleaningTriggerStrategy strategy) { } } + private HoodieCleanerPlan getEmptyCleanerPlan(Option earliestInstant, CleanPlanner planner) throws IOException { + HoodieCleanerPlan.Builder cleanBuilder = HoodieCleanerPlan.newBuilder() + .setFilePathsToBeDeletedPerPartition(Collections.emptyMap()) + .setExtraMetadata(prepareExtraMetadata(planner.getSavepointedTimestamps())); + if (earliestInstant.isPresent()) { + HoodieInstant hoodieInstant = earliestInstant.get(); + cleanBuilder.setPolicy(config.getCleanerPolicy().name()) + .setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION) + .setEarliestInstantToRetain(new HoodieActionInstant(hoodieInstant.requestedTime(), hoodieInstant.getAction(), hoodieInstant.getState().name())) + .setLastCompletedCommitTimestamp(planner.getLastCompletedCommitTimestamp()); + } else { + cleanBuilder.setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) + .setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION); + } + return cleanBuilder.build(); + } + /** * Generates List of files to be cleaned. * @@ -109,8 +132,8 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { context.clearJobStatus(); if (partitionsToClean.isEmpty()) { - log.info("Nothing to clean here. It is already clean"); - return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); + log.info("Partitions to clean returned empty. Checking to see if empty clean needs to be created."); + return getEmptyCleanerPlan(earliestInstant, planner); } log.info( "Earliest commit to retain for clean : {}", @@ -213,14 +236,61 @@ protected Option requestClean() { cleanerEngineContext = context; } final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext); - Option option = Option.empty(); - if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition()) - && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) { + Option cleanPlanOpt = Option.empty(); + if ((cleanerPlan.getPartitionsToBeDeleted() != null && !cleanerPlan.getPartitionsToBeDeleted().isEmpty()) + || (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition()) + && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0)) { // Only create cleaner plan which does some work - option = Option.of(cleanerPlan); + cleanPlanOpt = Option.of(cleanerPlan); } + // If cleaner plan returned an empty list, incremental clean is enabled and there was no + // completed clean created in the last X hours configured in MAX_DURATION_TO_CREATE_EMPTY_CLEAN, + // create a dummy clean to avoid full scan in the future. + // Note: For a dataset with incremental clean enabled, that does not receive any updates, cleaner plan always comes + // with an empty list of files to be cleaned. CleanActionExecutor would never be invoked for this dataset. + // To avoid fullscan on the dataset with every ingestion run, empty clean commit is created here. + if (cleanPlanOpt.isEmpty() && config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.maxDurationToCreateEmptyCleanMs() > 0) { + // Only create an empty clean commit if earliestInstantToRetain is present in the plan + boolean eligibleForEmptyCleanCommit = true; + + // if there is no previous clean instant or the previous clean instant was before the configured max duration, schedule an empty clean commit + Option lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant(); + if (lastCleanInstant.isPresent()) { + try { + ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(java.time.Instant.now(), table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId()); + long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli(); + long lastCleanTimeMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli(); + eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > config.maxDurationToCreateEmptyCleanMs(); + } catch (ParseException e) { + log.error("Unable to parse last clean commit time", e); + throw new HoodieException("Unable to parse last clean commit time", e); + } + } + if (eligibleForEmptyCleanCommit) { + // Ensure earliestCommitToRetain doesn't go backwards when user changes cleaner configuration + if (lastCleanInstant.isPresent()) { + try { + HoodieCleanMetadata lastCleanMetadata = table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get()); + String previousEarliestCommitToRetain = lastCleanMetadata.getEarliestCommitToRetain(); + String currentEarliestCommitToRetain = cleanerPlan.getEarliestInstantToRetain().getTimestamp(); - return option; + if (!StringUtils.isNullOrEmpty(previousEarliestCommitToRetain) && !StringUtils.isNullOrEmpty(currentEarliestCommitToRetain) + && compareTimestamps(currentEarliestCommitToRetain, LESSER_THAN, previousEarliestCommitToRetain)) { + log.warn("Skipping empty clean creation because earliestCommitToRetain would go backwards. " + + "Previous: {}, Current: {}. This can happen when cleaner configuration is changed.", + previousEarliestCommitToRetain, currentEarliestCommitToRetain); + return cleanPlanOpt; + } + } catch (IOException e) { + log.error("Unable to read last clean metadata", e); + throw new HoodieException("Unable to read last clean metadata", e); + } + } + log.info("Creating an empty clean instant with earliestCommitToRetain of {}", cleanerPlan.getEarliestInstantToRetain().getTimestamp()); + return Option.of(cleanerPlan); + } + } + return cleanPlanOpt; } @Override diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java index cc53e67b74872..0b42837b35869 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java @@ -513,6 +513,14 @@ static Stream keepLatestByHoursOrCommitsArgsIncrCleanPartitions() { Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), Option.empty(), activeInstantsPartitionsMap2, Collections.emptyList(), threePartitionsInActiveTimeline, true, Collections.emptyMap())); + // Empty cleaner plan case + arguments.add(Arguments.of(true, getCleanByHoursConfig(), earliestInstant, lastCompletedInLastClean, lastCleanInstant, + earliestInstantInLastClean, Collections.emptyList(), Collections.emptyMap(), Option.empty(), + activeInstantsPartitionsMap2, Collections.emptyList(), twoPartitionsInActiveTimeline, false, Collections.emptyMap())); + arguments.add(Arguments.of(false, getCleanByHoursConfig(), earliestInstant, lastCompletedInLastClean, lastCleanInstant, + earliestInstantInLastClean, Collections.emptyList(), Collections.emptyMap(), Option.empty(), + activeInstantsUnPartitionsMap, Collections.emptyList(), unPartitionsInActiveTimeline, false, Collections.emptyMap())); + return arguments.stream(); } @@ -598,8 +606,8 @@ private static HoodieCleanMetadata getCleanCommitMetadata(List partition Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), false))); Map extraMetadata = new HashMap<>(); extraMetadata.put(SAVEPOINTED_TIMESTAMPS, savepointsToTrack.stream().collect(Collectors.joining(","))); - return new HoodieCleanMetadata(instantTime, 100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata, - CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata.isEmpty() ? null : extraMetadata); + return new HoodieCleanMetadata(instantTime, 100L, partitionMetadata.isEmpty() ? 0 : 10, earliestCommitToRetain, lastCompletedTime, + partitionMetadata, CLEAN_METADATA_VERSION_2, Collections.emptyMap(), extraMetadata.isEmpty() ? null : extraMetadata); } private static HoodieSavepointMetadata getSavepointMetadata(List partitions) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 6b4f4f7160196..fdac4a963a1b0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -64,7 +64,6 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CleanerUtils; @@ -114,9 +113,11 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.NO_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; import static org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY; +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -374,7 +375,7 @@ public void testCleanNonPartitionedTable() throws IOException { .build()) .withEmbeddedTimelineServerEnabled(false).build(); // datagen for non-partitioned table - initTestDataGenerator(new String[] {HoodieTestDataGenerator.NO_PARTITION_PATH}); + initTestDataGenerator(new String[] {NO_PARTITION_PATH}); // init non-partitioned table HoodieTestUtils.init(storageConf, basePath, HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET, true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", true); @@ -392,17 +393,17 @@ public void testCleanNonPartitionedTable() throws IOException { instantTime = cleanPlanPair.getLeft(); HoodieCleanerPlan cleanPlan = cleanPlanPair.getRight(); assertEquals(cleanPlan.getPartitionsToBeDeleted().size(), 0); - assertEquals(cleanPlan.getFilePathsToBeDeletedPerPartition().get(HoodieTestDataGenerator.NO_PARTITION_PATH).size(), 1); - String filePathToClean = cleanPlan.getFilePathsToBeDeletedPerPartition().get(HoodieTestDataGenerator.NO_PARTITION_PATH).get(0).getFilePath(); + assertEquals(cleanPlan.getFilePathsToBeDeletedPerPartition().get(NO_PARTITION_PATH).size(), 1); + String filePathToClean = cleanPlan.getFilePathsToBeDeletedPerPartition().get(NO_PARTITION_PATH).get(0).getFilePath(); // clean HoodieTable table = HoodieSparkTable.create(writeConfig, context); HoodieCleanMetadata cleanMetadata = table.clean(context, instantTime); // check the cleaned file - assertEquals(cleanMetadata.getPartitionMetadata().get(HoodieTestDataGenerator.NO_PARTITION_PATH).getSuccessDeleteFiles().size(), 1); - assertTrue(filePathToClean.contains(cleanMetadata.getPartitionMetadata().get(HoodieTestDataGenerator.NO_PARTITION_PATH).getSuccessDeleteFiles().get(0))); + assertEquals(cleanMetadata.getPartitionMetadata().get(NO_PARTITION_PATH).getSuccessDeleteFiles().size(), 1); + assertTrue(filePathToClean.contains(cleanMetadata.getPartitionMetadata().get(NO_PARTITION_PATH).getSuccessDeleteFiles().get(0))); // ensure table is not fully cleaned and has a file group assertTrue(FSUtils.isTableExists(basePath, storage)); - assertTrue(table.getFileSystemView().getAllFileGroups(HoodieTestDataGenerator.NO_PARTITION_PATH).findAny().isPresent()); + assertTrue(table.getFileSystemView().getAllFileGroups(NO_PARTITION_PATH).findAny().isPresent()); } } @@ -1695,4 +1696,52 @@ public void testPreWriteCleanPolicyDisabledWhenTableServicesDisabled(boolean com assertEquals(7, metaClient.reloadActiveTimeline().getWriteTimeline().countInstants()); assertEquals(0, metaClient.getActiveTimeline().getCleanerTimeline().countInstants()); } + + @Test + void testEmptyClean() throws IOException { + // validate that an empty cleaner plan does not throw any errors at execution time + HoodieWriteConfig writeConfig = getConfigBuilder().withPath(basePath) + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) + .retainCommits(1) + .build()) + .withEmbeddedTimelineServerEnabled(false).build(); + // datagen for non-partitioned table + initTestDataGenerator(new String[] {NO_PARTITION_PATH}); + // init non-partitioned table + HoodieTableMetaClient metaClient = HoodieTestUtils.init(getDefaultStorageConf(), basePath, HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET, + true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", true); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) { + String instantTime = client.startCommit(); + List records = dataGen.generateInserts(instantTime, 1); + client.commit(instantTime, client.insert(jsc.parallelize(records, 1), instantTime)); + + instantTime = metaClient.createNewInstantTime(false); + HoodieTable table = HoodieSparkTable.create(writeConfig, context); + + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); + HoodieInstant hoodieInstant = timeline.firstInstant().get(); + HoodieCleanerPlan cleanerPlan = HoodieCleanerPlan.newBuilder() + .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name()) + .setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION) + .setEarliestInstantToRetain(new HoodieActionInstant(hoodieInstant.requestedTime(), hoodieInstant.getAction(), hoodieInstant.getState().name())) + .setLastCompletedCommitTimestamp(timeline.lastInstant().get().requestedTime()) + .setFilePathsToBeDeletedPerPartition(Collections.emptyMap()) + .build(); + final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, instantTime, + metaClient.getTimelineLayout().getInstantComparator().completionTimeOrderedComparator()); + table.getActiveTimeline().saveToCleanRequested(cleanInstant, Option.of(cleanerPlan)); + + table.getMetaClient().reloadActiveTimeline(); + // clean + HoodieCleanMetadata cleanMetadata = table.clean(context, instantTime); + // check the cleaned files are empty + assertTrue(cleanMetadata.getPartitionMetadata().isEmpty()); + } + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index 66385e0952e96..59e5460850e90 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -18,8 +18,10 @@ package org.apache.hudi.table.functional; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.WriteClientTestUtils; +import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.BootstrapFileMapping; @@ -30,11 +32,13 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -48,12 +52,14 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -61,8 +67,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_COMPARATOR; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -813,4 +821,208 @@ public void testKeepXHoursWithCleaning( testTable.close(); } } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testEmptyCleansAddedAfterThreshold(boolean secondCommitAfterThreshold) throws Exception { + boolean enableIncrementalClean = true; + boolean enableBootstrapSourceClean = false; + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withIncrementalCleaningMode(enableIncrementalClean) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2) + .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .build()) + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + try { + String p0 = "2020/01/01"; + + String file1P0C0 = UUID.randomUUID().toString(); + Instant instant = Instant.now(); + ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, metaClient.getTableConfig().getTimelineTimezone().getZoneId()); + int minutesForFirstCommit = 180; + String firstCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusMinutes(minutesForFirstCommit).toInstant())); + + commitToTestTable(testTable, firstCommitTs, p0, file1P0C0); + testTable = tearDownTestTableAndReinit(testTable, config); + + // make next commit, with 1 insert & 1 update per partition + String file2P0C1 = UUID.randomUUID().toString(); + int minutesForSecondCommit = 150; + String secondCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusMinutes(minutesForSecondCommit).toInstant())); + testTable = tearDownTestTableAndReinit(testTable, config); + + commitToTestTable(testTable, secondCommitTs, p0, file2P0C1); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // make next commit, with 1 insert per partition + int minutesForThirdCommit = 90; + String thirdCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusMinutes(minutesForThirdCommit).toInstant())); + String file3P0C2 = UUID.randomUUID().toString(); + + testTable = tearDownTestTableAndReinit(testTable, config); + + commitToTestTable(testTable, thirdCommitTs, p0, file3P0C2); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // first empty clean can be generated since earliest instant to retain will be the first commit (always keep last two instants at a minimum) + String firstCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minus(secondCommitAfterThreshold ? 70 : 30, ChronoUnit.MINUTES).toInstant())); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + List hoodieCleanStatsThree = runCleaner(config, false, false, writeClient, firstCleanInstant); + assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any partitions and clean any files"); + assertEquals(1, metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants().countInstants()); + String actualFirst = metaClient.getActiveTimeline().getCleanerTimeline().lastInstant().get().requestedTime(); + writeClient.close(); + + String file4P0C1 = UUID.randomUUID().toString(); + int minutesForFourthCommit = 10; + String fourthCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusMinutes(minutesForFourthCommit).toInstant())); + testTable = tearDownTestTableAndReinit(testTable, config); + + commitToTestTable(testTable, fourthCommitTs, p0, file4P0C1); + testTable = tearDownTestTableAndReinit(testTable, config); + + // add a savepoint + SparkRDDWriteClient writeClient1 = null; + try { + writeClient1 = getHoodieWriteClient(config); + writeClient1.savepoint(fourthCommitTs, "user", "comment"); + } finally { + writeClient1.close(); + } + + Date firstCleanDate = HoodieInstantTimeGenerator.parseDateFromInstantTime(firstCleanInstant); + int minutesBetweenCleans = secondCommitAfterThreshold ? 70 : 30; + String secondCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(firstCleanDate.toInstant().plus(minutesBetweenCleans, ChronoUnit.MINUTES))); + + writeClient = getHoodieWriteClient(config); + List hoodieCleanStatsFour = runCleaner(config, false, false, writeClient, secondCleanInstant); + HoodieTimeline finalCompletedCleanInstants = metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants(); + if (secondCommitAfterThreshold) { + // second empty clean is added + assertEquals(0, hoodieCleanStatsFour.size(), "Must not scan any partitions and clean any files"); + assertEquals(2, finalCompletedCleanInstants.countInstants()); + // Ensure that extra metadata is properly set for empty clean commits + HoodieCleanMetadata secondCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), finalCompletedCleanInstants.lastInstant().get()); + // new clean should have the savepoint created + assertEquals(fourthCommitTs, secondCleanMetadata.getExtraMetadata().get(CleanerUtils.SAVEPOINTED_TIMESTAMPS)); + // assertEquals(thirdCommitTs, secondCleanMetadata.getExtraMetadata().get(CleanPlanner.EARLIEST_COMMIT_TO_NOT_ARCHIVE)); + } else { + // no cleaner commit should be added because the time since last clean threshold has not been met + assertEquals(1, finalCompletedCleanInstants.countInstants()); + // Ensure that extra metadata is properly set for empty clean commits + HoodieCleanMetadata firstCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), finalCompletedCleanInstants.lastInstant().get()); + //assertEquals(thirdCommitTs, firstCleanMetadata.getExtraMetadata().get(CleanPlanner.EARLIEST_COMMIT_TO_NOT_ARCHIVE)); + // first clean commit happened before the savepoint so this field is expected to not be present in the map + assertFalse(firstCleanMetadata.getExtraMetadata().containsKey(CleanerUtils.SAVEPOINTED_TIMESTAMPS)); + } + writeClient.close(); + } finally { + testTable.close(); + } + } + + @Test + void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception { + // Test that earliestCommitToRetain never goes backwards when user changes cleaner config + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withIncrementalCleaningMode(true) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(false) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(24) + .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .build()) + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + try { + String p0 = "2020/01/01"; + Instant instant = Instant.now(); + ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, metaClient.getTableConfig().getTimelineTimezone().getZoneId()); + + // Create first commit 48 hours ago + String file1P0C0 = UUID.randomUUID().toString(); + String firstCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(48).toInstant())); + commitToTestTable(testTable, firstCommitTs, p0, file1P0C0); + testTable = tearDownTestTableAndReinit(testTable, config); + + // Create second commit 36 hours ago + String file2P0C1 = UUID.randomUUID().toString(); + String secondCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(36).toInstant())); + commitToTestTable(testTable, secondCommitTs, p0, file2P0C1); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Create third commit 12 hours ago + String file3P0C2 = UUID.randomUUID().toString(); + String thirdCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(12).toInstant())); + commitToTestTable(testTable, thirdCommitTs, p0, file3P0C2); + testTable = tearDownTestTableAndReinit(testTable, config); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Run first empty clean 2 hours ago - should retain commits from 26 hours ago (24h retention + 2h safety) + String firstCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(2).toInstant())); + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + List hoodieCleanStatsOne = runCleaner(config, false, false, writeClient, firstCleanInstant); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); + assertEquals(1, metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants().countInstants()); + + // Get the earliestCommitToRetain from first clean + HoodieInstant firstCleanCompleted = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant().get(); + HoodieCleanMetadata firstCleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, firstCleanCompleted); + String firstEarliestCommitToRetain = firstCleanMetadata.getEarliestCommitToRetain(); + writeClient.close(); + + // Now change config to retain only 12 hours (which would normally make earliestCommitToRetain go backwards) + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withIncrementalCleaningMode(true) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(false) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(12) + .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .build()) + .build(); + + // Try to create another empty clean with the new config 61 minutes after first clean + String secondCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from( + HoodieInstantTimeGenerator.parseDateFromInstantTime(firstCleanInstant).toInstant().plus(61, ChronoUnit.MINUTES))); + + writeClient = getHoodieWriteClient(newConfig); + List hoodieCleanStatsTwo = runCleaner(newConfig, false, false, writeClient, secondCleanInstant); + + // The clean should be skipped because earliestCommitToRetain would go backwards + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTimeline cleanTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); + assertEquals(1, cleanTimeline.countInstants(), "Second clean should be skipped to prevent earliestCommitToRetain from going backwards"); + + // Verify earliestCommitToRetain did not change + HoodieCleanMetadata latestCleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, cleanTimeline.lastInstant().get()); + assertEquals(firstEarliestCommitToRetain, latestCleanMetadata.getEarliestCommitToRetain(), + "earliestCommitToRetain should not go backwards"); + writeClient.close(); + } finally { + testTable.close(); + } + } + + private void commitToTestTable(HoodieTestTable testTable, String commitTimeTs, String partition, String fileId) throws Exception { + testTable.addInflightCommit(commitTimeTs); + testTable.withBaseFilesInPartition(partition, fileId); + HoodieCommitMetadata commitMeta = generateCommitMetadata(commitTimeTs, Collections.singletonMap(partition, Collections.singletonList(fileId))); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTimeTs, INSTANT_COMPARATOR.completionTimeOrderedComparator()), + Option.of(commitMeta)); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java index aa1a5ee6a9697..66f2f595ef4da 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java @@ -100,6 +100,18 @@ protected List runCleaner( return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 1, false); } + protected List runCleaner( + HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure, + Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + try { + String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); + return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, writeClient, cleanInstantTs); + } finally { + writeClient.close(); + } + } + /** * Helper to run cleaner and collect Clean Stats. * @@ -107,11 +119,8 @@ protected List runCleaner( */ protected List runCleaner( HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure, - Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { - SparkRDDWriteClient writeClient = getHoodieWriteClient(config); - String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); + SparkRDDWriteClient writeClient, String cleanInstantTs) throws IOException { HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); - if (null == cleanMetadata1) { return new ArrayList<>(); } From 70384ea1d2bf52407027ffe26a5feea4a85f2dd5 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 20 Apr 2026 08:02:37 -0700 Subject: [PATCH 2/3] Addressing last few feedback --- .../org/apache/hudi/config/HoodieCleanConfig.java | 14 +++++++------- .../action/clean/CleanPlanActionExecutor.java | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java index 77ffa7c1f91d3..120042fc43cf8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodiePreWriteCleanerPolicy; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.table.action.clean.CleaningTriggerStrategy; import javax.annotation.concurrent.Immutable; @@ -260,7 +259,7 @@ public class HoodieCleanConfig extends HoodieConfig { + "dataset. Also, for these datasets we cannot ignore clean completely since in the future there could " + "be upsert or replace operations. By creating empty clean commit, earliest_commit_to_retain value " + "will be updated so that now clean planner can only check for partitions that are modified after the " - + "last empty clean's earliest_commit_toRetain value there by optimizing the clean planning"); + + "last empty clean's earliest_commit_toRetain value thereby optimizing the clean planning"); /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */ @Deprecated @@ -437,8 +436,8 @@ public HoodieCleanConfig.Builder withMaxCommitsToClean(long maxCommitsToClean) { return this; } - public HoodieCleanConfig.Builder withMaxDurationToCreateEmptyClean(long duration) { - cleanConfig.setValue(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, String.valueOf(duration)); + public HoodieCleanConfig.Builder withMaxDurationToCreateEmptyClean(long durationMs) { + cleanConfig.setValue(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, String.valueOf(durationMs)); return this; } @@ -450,9 +449,10 @@ public HoodieCleanConfig build() { if (maxCommitsToClean < 1) { throw new IllegalArgumentException(MAX_COMMITS_TO_CLEAN.key() + " must be >= 1, but was " + maxCommitsToClean); } - long maxDuration = cleanConfig.getLong(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); - ValidationUtils.checkArgument(maxDuration != 0, - MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be either -1 (disabled) or a positive value, but got 0"); + long maxDurationMs = cleanConfig.getLong(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); + if (maxDurationMs == 0 || maxDurationMs <= -1) { + throw new IllegalArgumentException(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be >= 1, but was " + maxDurationMs); + } return cleanConfig; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 06dba1f025f3e..0bf9265ecd8df 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -111,7 +111,7 @@ private HoodieCleanerPlan getEmptyCleanerPlan(Option earliestInst .setEarliestInstantToRetain(new HoodieActionInstant(hoodieInstant.requestedTime(), hoodieInstant.getAction(), hoodieInstant.getState().name())) .setLastCompletedCommitTimestamp(planner.getLastCompletedCommitTimestamp()); } else { - cleanBuilder.setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) + cleanBuilder.setPolicy(config.getCleanerPolicy().name()) .setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION); } return cleanBuilder.build(); From 4a7ae53a9244c7a99b3beebdd336ed294f9ec415 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 20 Apr 2026 15:08:49 -0700 Subject: [PATCH 3/3] Addressing feedback --- .../org/apache/hudi/config/HoodieCleanConfig.java | 14 +++++++------- .../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++-- .../table/action/clean/CleanActionExecutor.java | 10 ++++++---- .../action/clean/CleanPlanActionExecutor.java | 8 ++++---- .../java/org/apache/hudi/table/TestCleaner.java | 8 +++++++- .../table/functional/TestCleanPlanExecutor.java | 7 +++---- 6 files changed, 29 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java index 120042fc43cf8..874c5445cbd5f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java @@ -250,8 +250,8 @@ public class HoodieCleanConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Maximum number of commits to clean in one clean commit. Applicable only when the clean policy is based on KEEP_LATEST_COMMITS or KEEP_LATEST_HOURS"); - public static final ConfigProperty MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS = ConfigProperty - .key("hoodie.write.empty.clean.create.duration.ms") + public static final ConfigProperty MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS = ConfigProperty + .key("hoodie.write.empty.clean.internval.hours") .defaultValue(-1L) .markAdvanced() .withDocumentation("In some cases empty clean commit needs to be created to ensure the clean planner " @@ -436,8 +436,8 @@ public HoodieCleanConfig.Builder withMaxCommitsToClean(long maxCommitsToClean) { return this; } - public HoodieCleanConfig.Builder withMaxDurationToCreateEmptyClean(long durationMs) { - cleanConfig.setValue(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, String.valueOf(durationMs)); + public HoodieCleanConfig.Builder withMaxIntervalToCreateEmptyCleanHours(long durationHours) { + cleanConfig.setValue(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS, String.valueOf(durationHours)); return this; } @@ -449,9 +449,9 @@ public HoodieCleanConfig build() { if (maxCommitsToClean < 1) { throw new IllegalArgumentException(MAX_COMMITS_TO_CLEAN.key() + " must be >= 1, but was " + maxCommitsToClean); } - long maxDurationMs = cleanConfig.getLong(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); - if (maxDurationMs == 0 || maxDurationMs <= -1) { - throw new IllegalArgumentException(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be >= 1, but was " + maxDurationMs); + long maxDurationHours = cleanConfig.getLong(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS); + if (maxDurationHours == 0 || maxDurationHours < -1) { + throw new IllegalArgumentException(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS.key() + " must be >= 1, but was " + maxDurationHours); } return cleanConfig; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9b08d26141908..9598913b75842 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1842,8 +1842,8 @@ public boolean isAutoClean() { return getBoolean(HoodieCleanConfig.AUTO_CLEAN); } - public long maxDurationToCreateEmptyCleanMs() { - return getLong(HoodieCleanConfig.MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); + public long maxIntervalToCreateEmptyCleanHours() { + return getLong(HoodieCleanConfig.MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS); } public boolean shouldArchiveBeyondSavepoint() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index ce6c6af2b8ee4..3fad30b22bdf0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -243,17 +243,19 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstan private static HoodieCleanMetadata createEmptyCleanMetadata(HoodieCleanerPlan cleanerPlan, HoodieInstant inflightInstant, long timeTakenMillis) { ValidationUtils.checkArgument(cleanerPlan.getEarliestInstantToRetain() != null, "For empty cleans, earliest instant to retain can never be null"); - return HoodieCleanMetadata.newBuilder() + HoodieCleanMetadata.Builder cleanMetadataBuilder = HoodieCleanMetadata.newBuilder() .setStartCleanTime(inflightInstant.requestedTime()) .setTimeTakenInMillis(timeTakenMillis) .setTotalFilesDeleted(0) .setLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp()) - .setEarliestCommitToRetain(cleanerPlan.getEarliestInstantToRetain().getTimestamp()) .setVersion(CLEAN_METADATA_VERSION_2) .setPartitionMetadata(Collections.emptyMap()) .setExtraMetadata(cleanerPlan.getExtraMetadata()) - .setBootstrapPartitionMetadata(Collections.emptyMap()) - .build(); + .setBootstrapPartitionMetadata(Collections.emptyMap()); + if (cleanerPlan.getEarliestInstantToRetain() != null) { + cleanMetadataBuilder.setEarliestCommitToRetain(cleanerPlan.getEarliestInstantToRetain().getTimestamp()); + } + return cleanMetadataBuilder.build(); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 0bf9265ecd8df..7f518822237bc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.CleanFileInfo; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; @@ -51,6 +50,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; @@ -249,7 +249,7 @@ protected Option requestClean() { // Note: For a dataset with incremental clean enabled, that does not receive any updates, cleaner plan always comes // with an empty list of files to be cleaned. CleanActionExecutor would never be invoked for this dataset. // To avoid fullscan on the dataset with every ingestion run, empty clean commit is created here. - if (cleanPlanOpt.isEmpty() && config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.maxDurationToCreateEmptyCleanMs() > 0) { + if (cleanPlanOpt.isEmpty() && config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.maxIntervalToCreateEmptyCleanHours() > 0) { // Only create an empty clean commit if earliestInstantToRetain is present in the plan boolean eligibleForEmptyCleanCommit = true; @@ -260,7 +260,7 @@ protected Option requestClean() { ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(java.time.Instant.now(), table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId()); long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli(); long lastCleanTimeMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli(); - eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > config.maxDurationToCreateEmptyCleanMs(); + eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > (TimeUnit.HOURS.toMillis(config.maxIntervalToCreateEmptyCleanHours())); } catch (ParseException e) { log.error("Unable to parse last clean commit time", e); throw new HoodieException("Unable to parse last clean commit time", e); @@ -279,7 +279,7 @@ && compareTimestamps(currentEarliestCommitToRetain, LESSER_THAN, previousEarlies log.warn("Skipping empty clean creation because earliestCommitToRetain would go backwards. " + "Previous: {}, Current: {}. This can happen when cleaner configuration is changed.", previousEarliestCommitToRetain, currentEarliestCommitToRetain); - return cleanPlanOpt; + return Option.empty(); } } catch (IOException e) { log.error("Unable to read last clean metadata", e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index fdac4a963a1b0..f57ee82385f6c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -1740,8 +1740,14 @@ void testEmptyClean() throws IOException { table.getMetaClient().reloadActiveTimeline(); // clean HoodieCleanMetadata cleanMetadata = table.clean(context, instantTime); - // check the cleaned files are empty + // validate all fields of the empty clean metadata assertTrue(cleanMetadata.getPartitionMetadata().isEmpty()); + assertEquals(0, cleanMetadata.getTotalFilesDeleted()); + assertEquals(hoodieInstant.requestedTime(), cleanMetadata.getEarliestCommitToRetain()); + assertEquals(timeline.lastInstant().get().requestedTime(), cleanMetadata.getLastCompletedCommitTimestamp()); + assertEquals(instantTime, cleanMetadata.getStartCleanTime()); + assertTrue(cleanMetadata.getBootstrapPartitionMetadata().isEmpty()); + assertTrue(cleanMetadata.getTimeTakenInMillis() >= 0); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index 59e5460850e90..ce7070c285882 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -67,7 +67,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_COMPARATOR; @@ -834,7 +833,7 @@ void testEmptyCleansAddedAfterThreshold(boolean secondCommitAfterThreshold) thro .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2) - .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .withMaxIntervalToCreateEmptyCleanHours(1) .build()) .build(); @@ -940,7 +939,7 @@ void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception { .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .withCleanBootstrapBaseFileEnabled(false) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(24) - .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .withMaxIntervalToCreateEmptyCleanHours(1) .build()) .build(); @@ -991,7 +990,7 @@ void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception { .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .withCleanBootstrapBaseFileEnabled(false) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(12) - .withMaxDurationToCreateEmptyClean(TimeUnit.MINUTES.toMillis(60)) + .withMaxIntervalToCreateEmptyCleanHours(1) .build()) .build();