Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ public class HoodieCleanConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Maximum number of commits to clean in one clean commit. Applicable only when the clean policy is based on KEEP_LATEST_COMMITS or KEEP_LATEST_HOURS");

public static final ConfigProperty<Long> MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS = ConfigProperty
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS -> `INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS

.key("hoodie.write.empty.clean.internval.hours")
.defaultValue(-1L)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't reject the property's own default.

Line 255 sets -1L as the disabled default, but Lines 452-455 reject every value <= -1. That makes HoodieCleanConfig.newBuilder().build() fail unless every caller explicitly overrides this new option, which breaks existing configs and tests.

Suggested fix
-      if (maxDurationMs == 0 || maxDurationMs <= -1) {
-        throw new IllegalArgumentException(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be >= 1, but was " + maxDurationMs);
+      if (maxDurationMs == 0 || maxDurationMs < -1) {
+        throw new IllegalArgumentException(
+            MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be -1 (disabled) or >= 1, but was " + maxDurationMs);
       }

Also applies to: 452-455

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java`
around lines 253 - 255, The validation currently rejects values <= -1 for
MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, which invalidates the property's intended
disabled default (-1L) and breaks HoodieCleanConfig.newBuilder().build(); update
the check that references MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS to allow -1 by
changing the condition to reject only values < -1 (or explicitly permit the
property's default sentinel), so the default -1L is accepted while still
rejecting invalid negatives.

CodeRabbit (original) (source:comment#3111681444)

.markAdvanced()
.withDocumentation("In some cases empty clean commit needs to be created to ensure the clean planner "
Comment thread
nsivabalan marked this conversation as resolved.
+ "does not look through entire dataset if there are no clean plans. This is possible for append-only "
+ "dataset. Also, for these datasets we cannot ignore clean completely since in the future there could "
+ "be upsert or replace operations. By creating empty clean commit, earliest_commit_to_retain value "
+ "will be updated so that now clean planner can only check for partitions that are modified after the "
Comment thread
nsivabalan marked this conversation as resolved.
+ "last empty clean's earliest_commit_toRetain value thereby optimizing the clean planning");

Comment thread
nsivabalan marked this conversation as resolved.
/** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
@Deprecated
Expand Down Expand Up @@ -426,6 +436,11 @@ public HoodieCleanConfig.Builder withMaxCommitsToClean(long maxCommitsToClean) {
return this;
}

public HoodieCleanConfig.Builder withMaxIntervalToCreateEmptyCleanHours(long durationHours) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withIntervalToCreateEmptyCleanHours

cleanConfig.setValue(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS, String.valueOf(durationHours));
return this;
}

public HoodieCleanConfig build() {
cleanConfig.setDefaults(HoodieCleanConfig.class.getName());
HoodieCleaningPolicy.valueOf(cleanConfig.getString(CLEANER_POLICY));
Expand All @@ -434,6 +449,10 @@ public HoodieCleanConfig build() {
if (maxCommitsToClean < 1) {
throw new IllegalArgumentException(MAX_COMMITS_TO_CLEAN.key() + " must be >= 1, but was " + maxCommitsToClean);
}
long maxDurationHours = cleanConfig.getLong(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxDurationHours -> emptyCleanIntervalHours

if (maxDurationHours == 0 || maxDurationHours < -1) {
throw new IllegalArgumentException(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS.key() + " must be >= 1, but was " + maxDurationHours);
}
return cleanConfig;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,10 @@ public boolean isAutoClean() {
return getBoolean(HoodieCleanConfig.AUTO_CLEAN);
}

public long maxIntervalToCreateEmptyCleanHours() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getIntervalHoursToCreateEmptyClean

return getLong(HoodieCleanConfig.MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS);
}

public boolean shouldArchiveBeyondSavepoint() {
return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;

@Slf4j
Expand Down Expand Up @@ -134,9 +136,9 @@ private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
int cleanerParallelism = Math.max(1, Math.min(
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
config.getCleanerParallelism());
config.getCleanerParallelism()));
log.info("Using cleanerParallelism: {}", cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName());
Expand All @@ -155,7 +157,7 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean

List<String> partitionsToBeDeleted = table.getMetaClient().getTableConfig().isTablePartitioned() && cleanerPlan.getPartitionsToBeDeleted() != null
Comment thread
nsivabalan marked this conversation as resolved.
? cleanerPlan.getPartitionsToBeDeleted()
: new ArrayList<>();
: Collections.emptyList();
partitionsToBeDeleted.forEach(entry -> {
if (!isNullOrEmpty(entry)) {
deleteFileAndGetResult(table.getStorage(), table.getMetaClient().getBasePath() + "/" + entry);
Expand Down Expand Up @@ -213,17 +215,18 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
}

List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);
table.getMetaClient().reloadActiveTimeline();
HoodieCleanMetadata metadata;
if (cleanStats.isEmpty()) {
return HoodieCleanMetadata.newBuilder().build();
metadata = createEmptyCleanMetadata(cleanerPlan, inflightInstant, timer.endTimer());
} else {
metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.requestedTime(),
Option.of(timer.endTimer()),
cleanStats,
cleanerPlan.getExtraMetadata()
);
Comment thread
nsivabalan marked this conversation as resolved.
}

table.getMetaClient().reloadActiveTimeline();
HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.requestedTime(),
Option.of(timer.endTimer()),
cleanStats,
cleanerPlan.getExtraMetadata()
);
this.txnManager.beginStateChange(Option.of(inflightInstant), Option.empty());
writeTableMetadata(metadata, inflightInstant.requestedTime());
table.getActiveTimeline().transitionCleanInflightToComplete(
Expand All @@ -238,6 +241,23 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
}
}

private static HoodieCleanMetadata createEmptyCleanMetadata(HoodieCleanerPlan cleanerPlan, HoodieInstant inflightInstant, long timeTakenMillis) {
ValidationUtils.checkArgument(cleanerPlan.getEarliestInstantToRetain() != null, "For empty cleans, earliest instant to retain can never be null");
HoodieCleanMetadata.Builder cleanMetadataBuilder = HoodieCleanMetadata.newBuilder()
.setStartCleanTime(inflightInstant.requestedTime())
.setTimeTakenInMillis(timeTakenMillis)
.setTotalFilesDeleted(0)
.setLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp())
.setVersion(CLEAN_METADATA_VERSION_2)
.setPartitionMetadata(Collections.emptyMap())
.setExtraMetadata(cleanerPlan.getExtraMetadata())
Comment thread
nsivabalan marked this conversation as resolved.
.setBootstrapPartitionMetadata(Collections.emptyMap());
if (cleanerPlan.getEarliestInstantToRetain() != null) {
cleanMetadataBuilder.setEarliestCommitToRetain(cleanerPlan.getEarliestInstantToRetain().getTimestamp());
}
return cleanMetadataBuilder.build();
}

@Override
public HoodieCleanMetadata execute() {
List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -42,13 +43,18 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.text.ParseException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.CleanerUtils.SAVEPOINTED_TIMESTAMPS;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;

Expand Down Expand Up @@ -94,6 +100,23 @@ private boolean needsCleaning(CleaningTriggerStrategy strategy) {
}
}

private HoodieCleanerPlan getEmptyCleanerPlan(Option<HoodieInstant> earliestInstant, CleanPlanner<T, I, K, O> planner) throws IOException {
HoodieCleanerPlan.Builder cleanBuilder = HoodieCleanerPlan.newBuilder()
.setFilePathsToBeDeletedPerPartition(Collections.emptyMap())
.setExtraMetadata(prepareExtraMetadata(planner.getSavepointedTimestamps()));
if (earliestInstant.isPresent()) {
HoodieInstant hoodieInstant = earliestInstant.get();
cleanBuilder.setPolicy(config.getCleanerPolicy().name())
.setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION)
.setEarliestInstantToRetain(new HoodieActionInstant(hoodieInstant.requestedTime(), hoodieInstant.getAction(), hoodieInstant.getState().name()))
.setLastCompletedCommitTimestamp(planner.getLastCompletedCommitTimestamp());
Comment thread
nsivabalan marked this conversation as resolved.
} else {
cleanBuilder.setPolicy(config.getCleanerPolicy().name())
.setVersion(CleanPlanner.LATEST_CLEAN_PLAN_VERSION);
}
return cleanBuilder.build();
Comment thread
nsivabalan marked this conversation as resolved.
}

/**
* Generates List of files to be cleaned.
*
Expand All @@ -109,8 +132,8 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
context.clearJobStatus();

if (partitionsToClean.isEmpty()) {
log.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
log.info("Partitions to clean returned empty. Checking to see if empty clean needs to be created.");
return getEmptyCleanerPlan(earliestInstant, planner);
}
log.info(
"Earliest commit to retain for clean : {}",
Expand Down Expand Up @@ -213,14 +236,61 @@ protected Option<HoodieCleanerPlan> requestClean() {
cleanerEngineContext = context;
}
final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext);
Option<HoodieCleanerPlan> option = Option.empty();
if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
Option<HoodieCleanerPlan> cleanPlanOpt = Option.empty();
if ((cleanerPlan.getPartitionsToBeDeleted() != null && !cleanerPlan.getPartitionsToBeDeleted().isEmpty())
|| (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0)) {
// Only create cleaner plan which does some work
option = Option.of(cleanerPlan);
cleanPlanOpt = Option.of(cleanerPlan);
}
// If cleaner plan returned an empty list, incremental clean is enabled and there was no
// completed clean created in the last X hours configured in MAX_DURATION_TO_CREATE_EMPTY_CLEAN,
// create a dummy clean to avoid full scan in the future.
// Note: For a dataset with incremental clean enabled, that does not receive any updates, cleaner plan always comes
// with an empty list of files to be cleaned. CleanActionExecutor would never be invoked for this dataset.
Comment thread
nsivabalan marked this conversation as resolved.
// To avoid fullscan on the dataset with every ingestion run, empty clean commit is created here.
if (cleanPlanOpt.isEmpty() && config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.maxIntervalToCreateEmptyCleanHours() > 0) {
// Only create an empty clean commit if earliestInstantToRetain is present in the plan
boolean eligibleForEmptyCleanCommit = true;

// if there is no previous clean instant or the previous clean instant was before the configured max duration, schedule an empty clean commit
Option<HoodieInstant> lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastCleanInstant.isPresent()) {
try {
ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(java.time.Instant.now(), table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
Comment thread
nsivabalan marked this conversation as resolved.
long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli();
long lastCleanTimeMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli();
eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > (TimeUnit.HOURS.toMillis(config.maxIntervalToCreateEmptyCleanHours()));
} catch (ParseException e) {
log.error("Unable to parse last clean commit time", e);
throw new HoodieException("Unable to parse last clean commit time", e);
Comment thread
nsivabalan marked this conversation as resolved.
}
}
if (eligibleForEmptyCleanCommit) {
// Ensure earliestCommitToRetain doesn't go backwards when user changes cleaner configuration
if (lastCleanInstant.isPresent()) {
try {
HoodieCleanMetadata lastCleanMetadata = table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get());
String previousEarliestCommitToRetain = lastCleanMetadata.getEarliestCommitToRetain();
String currentEarliestCommitToRetain = cleanerPlan.getEarliestInstantToRetain().getTimestamp();

return option;
if (!StringUtils.isNullOrEmpty(previousEarliestCommitToRetain) && !StringUtils.isNullOrEmpty(currentEarliestCommitToRetain)
&& compareTimestamps(currentEarliestCommitToRetain, LESSER_THAN, previousEarliestCommitToRetain)) {
log.warn("Skipping empty clean creation because earliestCommitToRetain would go backwards. "
+ "Previous: {}, Current: {}. This can happen when cleaner configuration is changed.",
previousEarliestCommitToRetain, currentEarliestCommitToRetain);
Comment on lines +279 to +281
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the currentEarliestCommitToRetain in the cleaner plan be adjusted to previousEarliestCommitToRetain, or is that overkill?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can fix the getEarliestInstantToRetain in the current plan to previousEarliestCommitToRetain and go ahead w/ empty clean

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @yihua :
actually, returning Option.empty from here.

the completed clean instant will only contain the requestedTime for the earliest commit to retain, but not the whole HoodieInstant.

I don't think its worth fetching the HoodieInstant from previous clean plan(deser clean requested) for this case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the claim. The previous ECTR is already fetched as previousEarliestCommitToRetain. In this case, set previousEarliestCommitToRetain in cleanerPlan won't work?

return Option.empty();
}
} catch (IOException e) {
log.error("Unable to read last clean metadata", e);
throw new HoodieException("Unable to read last clean metadata", e);
}
}
log.info("Creating an empty clean instant with earliestCommitToRetain of {}", cleanerPlan.getEarliestInstantToRetain().getTimestamp());
return Option.of(cleanerPlan);
Comment thread
nsivabalan marked this conversation as resolved.
}
}
return cleanPlanOpt;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,14 @@ static Stream<Arguments> keepLatestByHoursOrCommitsArgsIncrCleanPartitions() {
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), Option.empty(),
activeInstantsPartitionsMap2, Collections.emptyList(), threePartitionsInActiveTimeline, true, Collections.emptyMap()));

// Empty cleaner plan case
arguments.add(Arguments.of(true, getCleanByHoursConfig(), earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.emptyList(), Collections.emptyMap(), Option.empty(),
activeInstantsPartitionsMap2, Collections.emptyList(), twoPartitionsInActiveTimeline, false, Collections.emptyMap()));
arguments.add(Arguments.of(false, getCleanByHoursConfig(), earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.emptyList(), Collections.emptyMap(), Option.empty(),
activeInstantsUnPartitionsMap, Collections.emptyList(), unPartitionsInActiveTimeline, false, Collections.emptyMap()));

return arguments.stream();
}

Expand Down Expand Up @@ -598,8 +606,8 @@ private static HoodieCleanMetadata getCleanCommitMetadata(List<String> partition
Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), false)));
Map<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(SAVEPOINTED_TIMESTAMPS, savepointsToTrack.stream().collect(Collectors.joining(",")));
return new HoodieCleanMetadata(instantTime, 100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata.isEmpty() ? null : extraMetadata);
return new HoodieCleanMetadata(instantTime, 100L, partitionMetadata.isEmpty() ? 0 : 10, earliestCommitToRetain, lastCompletedTime,
partitionMetadata, CLEAN_METADATA_VERSION_2, Collections.emptyMap(), extraMetadata.isEmpty() ? null : extraMetadata);
}

private static HoodieSavepointMetadata getSavepointMetadata(List<String> partitions) {
Expand Down
Loading
Loading