Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ public class HoodieCleanConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Maximum number of commits to clean in one clean commit. Applicable only when the clean policy is based on KEEP_LATEST_COMMITS or KEEP_LATEST_HOURS");

public static final ConfigProperty<Long> MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS = ConfigProperty
.key("hoodie.write.empty.clean.internval.hours")
public static final ConfigProperty<Long> INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS = ConfigProperty
.key("hoodie.write.empty.clean.interval.hours")
.defaultValue(-1L)
.markAdvanced()
.withDocumentation("In some cases empty clean commit needs to be created to ensure the clean planner "
Expand Down Expand Up @@ -436,8 +436,8 @@ public HoodieCleanConfig.Builder withMaxCommitsToClean(long maxCommitsToClean) {
return this;
}

public HoodieCleanConfig.Builder withMaxIntervalToCreateEmptyCleanHours(long durationHours) {
cleanConfig.setValue(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS, String.valueOf(durationHours));
public HoodieCleanConfig.Builder withIntervalToCreateEmptyCleanHours(long emptyCleanIntervalHours) {
cleanConfig.setValue(INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS, String.valueOf(emptyCleanIntervalHours));
return this;
}

Expand All @@ -449,9 +449,9 @@ public HoodieCleanConfig build() {
if (maxCommitsToClean < 1) {
throw new IllegalArgumentException(MAX_COMMITS_TO_CLEAN.key() + " must be >= 1, but was " + maxCommitsToClean);
}
long maxDurationHours = cleanConfig.getLong(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS);
if (maxDurationHours == 0 || maxDurationHours < -1) {
throw new IllegalArgumentException(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS.key() + " must be >= 1, but was " + maxDurationHours);
long emptyCleanIntervalHours = cleanConfig.getLong(INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS);
if (emptyCleanIntervalHours == 0 || emptyCleanIntervalHours < -1) {
throw new IllegalArgumentException(INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS.key() + " must be -1 (disabled) or >= 1, but was " + emptyCleanIntervalHours);
}
return cleanConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1877,8 +1877,8 @@ public boolean isAutoClean() {
return getBoolean(HoodieCleanConfig.AUTO_CLEAN);
}

public long maxIntervalToCreateEmptyCleanHours() {
return getLong(HoodieCleanConfig.MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS);
public long getIntervalToCreateEmptyCleanHours() {
return getLong(HoodieCleanConfig.INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS);
}

public boolean shouldArchiveBeyondSavepoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,12 @@ protected Option<HoodieCleanerPlan> requestClean() {
cleanPlanOpt = Option.of(cleanerPlan);
}
// If cleaner plan returned an empty list, incremental clean is enabled and there was no
// completed clean created in the last X hours configured in MAX_DURATION_TO_CREATE_EMPTY_CLEAN,
// completed clean created in the last X hours configured in INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS,
// create a dummy clean to avoid full scan in the future.
// Note: For a dataset with incremental clean enabled, that does not receive any updates, cleaner plan always comes
// with an empty list of files to be cleaned. CleanActionExecutor would never be invoked for this dataset.
// To avoid fullscan on the dataset with every ingestion run, empty clean commit is created here.
if (cleanPlanOpt.isEmpty() && config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.maxIntervalToCreateEmptyCleanHours() > 0) {
if (cleanPlanOpt.isEmpty() && config.incrementalCleanerModeEnabled() && cleanerPlan.getEarliestInstantToRetain() != null && config.getIntervalToCreateEmptyCleanHours() > 0) {
// Only create an empty clean commit if earliestInstantToRetain is present in the plan
boolean eligibleForEmptyCleanCommit = true;

Expand All @@ -260,7 +260,7 @@ protected Option<HoodieCleanerPlan> requestClean() {
ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(java.time.Instant.now(), table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli();
long lastCleanTimeMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli();
eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > (TimeUnit.HOURS.toMillis(config.maxIntervalToCreateEmptyCleanHours()));
eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs > (TimeUnit.HOURS.toMillis(config.getIntervalToCreateEmptyCleanHours()));
} catch (ParseException e) {
log.error("Unable to parse last clean commit time", e);
throw new HoodieException("Unable to parse last clean commit time", e);
Expand All @@ -276,10 +276,10 @@ protected Option<HoodieCleanerPlan> requestClean() {

if (!StringUtils.isNullOrEmpty(previousEarliestCommitToRetain) && !StringUtils.isNullOrEmpty(currentEarliestCommitToRetain)
&& compareTimestamps(currentEarliestCommitToRetain, LESSER_THAN, previousEarliestCommitToRetain)) {
log.warn("Skipping empty clean creation because earliestCommitToRetain would go backwards. "
log.warn("Adjusting empty clean earliestCommitToRetain to previous value to avoid going backwards. "
+ "Previous: {}, Current: {}. This can happen when cleaner configuration is changed.",
previousEarliestCommitToRetain, currentEarliestCommitToRetain);
return Option.empty();
cleanerPlan.getEarliestInstantToRetain().setTimestamp(previousEarliestCommitToRetain);
}
} catch (IOException e) {
log.error("Unable to read last clean metadata", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ void testEmptyCleansAddedAfterThreshold(boolean secondCommitAfterThreshold) thro
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2)
.withMaxIntervalToCreateEmptyCleanHours(1)
.withIntervalToCreateEmptyCleanHours(1)
.build())
.build();

Expand Down Expand Up @@ -931,15 +931,18 @@ void testEmptyCleansAddedAfterThreshold(boolean secondCommitAfterThreshold) thro

@Test
void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception {
// Test that earliestCommitToRetain never goes backwards when user changes cleaner config
// Test that earliestCommitToRetain never goes backwards when user increases retention.
// Scenario: user starts with short retention (12h), then increases to long retention (72h).
// The longer retention would compute an older ECTR, but the code should adjust it to
// the previous ECTR and still create the empty clean.
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withIncrementalCleaningMode(true)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(false)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(24)
.withMaxIntervalToCreateEmptyCleanHours(1)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(12)
.withIntervalToCreateEmptyCleanHours(1)
.build())
.build();

Expand All @@ -949,27 +952,27 @@ void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception {
Instant instant = Instant.now();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, metaClient.getTableConfig().getTimelineTimezone().getZoneId());

// Create first commit 48 hours ago
// Create first commit 70 hours ago
String file1P0C0 = UUID.randomUUID().toString();
String firstCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(48).toInstant()));
String firstCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(70).toInstant()));
commitToTestTable(testTable, firstCommitTs, p0, file1P0C0);
testTable = tearDownTestTableAndReinit(testTable, config);

// Create second commit 36 hours ago
// Create second commit 48 hours ago
String file2P0C1 = UUID.randomUUID().toString();
String secondCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(36).toInstant()));
String secondCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(48).toInstant()));
commitToTestTable(testTable, secondCommitTs, p0, file2P0C1);
testTable = tearDownTestTableAndReinit(testTable, config);
metaClient = HoodieTableMetaClient.reload(metaClient);

// Create third commit 12 hours ago
// Create third commit 6 hours ago (well within 12h retention window)
String file3P0C2 = UUID.randomUUID().toString();
String thirdCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(12).toInstant()));
String thirdCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(6).toInstant()));
commitToTestTable(testTable, thirdCommitTs, p0, file3P0C2);
testTable = tearDownTestTableAndReinit(testTable, config);
metaClient = HoodieTableMetaClient.reload(metaClient);

// Run first empty clean 2 hours ago - should retain commits from 26 hours ago (24h retention + 2h safety)
// Run first empty clean with 12h retention - ECTR should be thirdCommitTs (6h ago)
String firstCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(2).toInstant()));
SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, false, false, writeClient, firstCleanInstant);
Expand All @@ -982,34 +985,41 @@ void testEmptyCleanDoesNotGoBackwardsOnConfigChange() throws Exception {
String firstEarliestCommitToRetain = firstCleanMetadata.getEarliestCommitToRetain();
writeClient.close();

// Now change config to retain only 12 hours (which would normally make earliestCommitToRetain go backwards)
// Add a new commit so that needsCleaning() passes for the second clean attempt
String file4P0C3 = UUID.randomUUID().toString();
String fourthCommitTs = HoodieInstantTimeGenerator.formatDate(Date.from(commitDateTime.minusHours(1).toInstant()));
commitToTestTable(testTable, fourthCommitTs, p0, file4P0C3);
testTable = tearDownTestTableAndReinit(testTable, config);
metaClient = HoodieTableMetaClient.reload(metaClient);

// Now increase retention to 72 hours, which would make ECTR go backwards to firstCommitTs (70h ago)
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withIncrementalCleaningMode(true)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(false)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(12)
.withMaxIntervalToCreateEmptyCleanHours(1)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(72)
.withIntervalToCreateEmptyCleanHours(1)
.build())
.build();

// Try to create another empty clean with the new config 61 minutes after first clean
// Create second empty clean 61 minutes after first clean
String secondCleanInstant = HoodieInstantTimeGenerator.formatDate(Date.from(
HoodieInstantTimeGenerator.parseDateFromInstantTime(firstCleanInstant).toInstant().plus(61, ChronoUnit.MINUTES)));

writeClient = getHoodieWriteClient(newConfig);
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(newConfig, false, false, writeClient, secondCleanInstant);

// The clean should be skipped because earliestCommitToRetain would go backwards
// The empty clean should still be created, but with ECTR adjusted to the previous value
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline cleanTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants();
assertEquals(1, cleanTimeline.countInstants(), "Second clean should be skipped to prevent earliestCommitToRetain from going backwards");
assertEquals(2, cleanTimeline.countInstants(), "Second empty clean should be created with adjusted ECTR");

// Verify earliestCommitToRetain did not change
HoodieCleanMetadata latestCleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, cleanTimeline.lastInstant().get());
assertEquals(firstEarliestCommitToRetain, latestCleanMetadata.getEarliestCommitToRetain(),
"earliestCommitToRetain should not go backwards");
// Verify earliestCommitToRetain did not go backwards
HoodieCleanMetadata secondCleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, cleanTimeline.lastInstant().get());
assertEquals(firstEarliestCommitToRetain, secondCleanMetadata.getEarliestCommitToRetain(),
"earliestCommitToRetain should be adjusted to previous value, not go backwards");
writeClient.close();
} finally {
testTable.close();
Expand Down
Loading