feat(clean): Adding empty clean support to hudi#18337
Conversation
80a9e29 to
5cdbc5d
Compare
|
@nsivabalan can we reuse the existing PR for creating empty clean? |
|
@suryaprasanna : this is just 1 patch right. for others, I will use the original PR. |
Ok, sounds good. |
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing! The approach to optimize clean planning for append-only datasets with empty clean commits is solid. I found one potential NPE in the backwards-ECTR guard when reading metadata from older cleans, plus a minor control flow suggestion — details in the inline comments.
|
@yihua : this is good to re-review. |
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice updates — both of my prior findings are addressed: the cleanPlanOpt.isEmpty() gate avoids unnecessary work when there's a real clean plan, and the StringUtils.isNullOrEmpty guards on both previousEarliestCommitToRetain and currentEarliestCommitToRetain prevent the NPE. The addition of filterCompletedInstants() on the clean timeline is also a good fix — it ensures we don't read metadata from an inflight/requested clean that hasn't been committed yet. @kbuci's comment about re-throwing the IOException in the catch block (line ~284) appears still open — the author acknowledged it but the fix doesn't seem to be in this diff. No new issues introduced.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This pull request introduces an empty clean feature for Hudi, allowing scheduled empty clean commits when no partitions require deletion. A new configuration property MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS controls whether empty clean operations are created based on elapsed time since the last clean. Configuration accessors, plan generation logic, and metadata handling have been added across multiple executor and test files.
Greptile Summary: This PR introduces an empty clean commit feature to Hudi's cleaning subsystem. For datasets using incremental clean that receive few or no updates, the cleaner planner previously never generated a plan (nothing to clean), meaning the earliestCommitToRetain pointer was never advanced. On the next run, the cleaner would then need to do a full-table scan. The feature adds:
- A new config
hoodie.write.empty.clean.create.duration.ms(default-1, disabled) that gates how often an empty clean commit may be written. - Logic in
CleanPlanActionExecutorto emit an "empty" plan (no files to delete) if no work is needed but the threshold has elapsed, and a safety guard to preventearliestCommitToRetainfrom regressing when the cleaner policy is changed. - A corresponding
createEmptyCleanMetadatabuilder inCleanActionExecutorfor the execution side. - New tests in
TestCleanPlanExecutorcovering the threshold, the regression guard, and extra metadata (savepoints).
Key findings:
- A potential
NullPointerExceptionincreateEmptyCleanMetadatawhen the clean plan'sEarliestInstantToRetainis null. - Missing
setVersion()call ingetEmptyCleanerPlanfor theearliestInstant-absent branch. - Unintended in-place mutation of the caller-supplied
extraMetadatamap inprepareExtraMetadata.
Greptile Confidence Score: 3/5
Not safe to merge as-is due to a latent NPE in createEmptyCleanMetadata that can be triggered by plans with only partition deletions and no earliest instant to retain.
The feature design and the threshold/regression-guard logic are sound. However, the NPE on cleanerPlan.getEarliestInstantToRetain().getTimestamp() in createEmptyCleanMetadata (line 250, CleanActionExecutor) is a real defect — it can be hit when a clean plan carries only partitionsToBeDeleted entries with no file paths and a null earliestInstantToRetain. Additionally, the missing setVersion in the absent-earliestInstant branch of getEmptyCleanerPlan risks plan-version skew. Both issues have targeted one-line fixes; once addressed, the PR is in good shape.
CleanActionExecutor.java (NPE on line 250) and CleanPlanActionExecutor.java (missing setVersion, extraMetadata mutation on lines 113–116 and 193–194) need attention before merge.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Client as Clean Planner
participant Executor as CleanPlanActionExecutor
participant Timeline as Active Timeline
participant Metadata as Metadata Generator
participant Config as HoodieWriteConfig
Client->>Executor: requestClean()
Executor->>Config: maxDurationToCreateEmptyCleanMs()
Config-->>Executor: duration threshold
alt Plan has work
Executor->>Executor: createNormalCleanerPlan()
Executor-->>Client: return plan
else Plan is empty & incremental mode
Executor->>Timeline: getLastCleanInstantTime()
Timeline-->>Executor: last clean timestamp
Executor->>Executor: compare elapsed time vs threshold
alt Time threshold exceeded & retention valid
Executor->>Executor: getEmptyCleanerPlan()
Executor->>Timeline: reload active timeline
Executor->>Metadata: createEmptyCleanMetadata()
Metadata-->>Executor: empty clean metadata
Executor-->>Client: return empty plan & metadata
else Threshold not met or retention invalid
Executor-->>Client: return empty (skip clean)
end
end
Sequence Diagram (Greptile):
sequenceDiagram
participant CE as CleanActionExecutor
participant CPAE as CleanPlanActionExecutor
participant CP as CleanPlanner
participant AT as ActiveTimeline
CE->>CPAE: execute()
CPAE->>CP: getEarliestCommitToRetain()
CP-->>CPAE: earliestInstant (may be empty)
CPAE->>CP: getPartitionPathsToClean(earliestInstant)
CP-->>CPAE: partitionsToClean
alt partitionsToClean is empty
CPAE->>CPAE: getEmptyCleanerPlan(earliestInstant)
CPAE-->>CE: cleanerPlan (empty files, set/null earliestInstantToRetain)
else partitions exist
CPAE->>CP: getDeletePaths per partition
CPAE-->>CE: cleanerPlan (with file paths)
end
CE->>CE: Check empty plan eligibility
note over CE: cleanPlanOpt.isEmpty() &&<br/>incrementalCleanerMode &&<br/>earliestInstantToRetain != null &&<br/>maxDuration > 0 &&<br/>time since last clean > threshold
alt eligible for empty clean
CE->>AT: saveToCleanRequested (empty plan)
CE->>CE: runClean(table, cleanInstant, cleanerPlan)
CE->>CE: clean(context, cleanerPlan) → cleanStats (empty list)
CE->>CE: createEmptyCleanMetadata(cleanerPlan, ...)
note over CE: NPE if getEarliestInstantToRetain() == null
CE->>AT: transitionCleanInflightToComplete
else not eligible
CE-->>CE: return cleanPlanOpt (empty)
end
CodeRabbit: yihua#31 (review)
Greptile: yihua#31 (review)
929a233 to
5208c76
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — Minor parameter naming and documentation issues; code is otherwise clean and well-organized.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Good progress on the empty clean feature — the overall approach of creating lightweight clean commits to avoid full table scans for append-only datasets is sound, and the backward-compatibility guard for earliestCommitToRetain is a nice addition. A few items worth addressing in the inline comments, primarily around config validation completeness and a minor staleness concern with the timeline check.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the earlier feedback — the switch to config.getCleanerPolicy().name() in getEmptyCleanerPlan is a nice refinement, and the replies on the other prior findings are reasonable. However, the attempt to tighten the MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS validation appears to have introduced a critical regression: with the default value of -1L (disabled) and the new check maxDurationMs <= -1, every HoodieCleanConfig.build() will throw. One inline comment worth addressing before merge. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.
| throw new IllegalArgumentException(MAX_COMMITS_TO_CLEAN.key() + " must be >= 1, but was " + maxCommitsToClean); | ||
| } | ||
| long maxDurationMs = cleanConfig.getLong(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); | ||
| if (maxDurationMs == 0 || maxDurationMs <= -1) { |
There was a problem hiding this comment.
🤖 This condition rejects -1, but -1 is both the default value (line 255: .defaultValue(-1L)) and the documented sentinel for "disabled" (the usage check is config.maxDurationToCreateEmptyCleanMs() > 0). As written, every HoodieCleanConfig.build() without an explicit override will throw IllegalArgumentException, breaking any writer using default config. Did you mean maxDurationMs < -1 here (i.e. allow -1, reject 0 and values below -1)? The error message "must be >= 1" would also need updating since -1 is a valid disabled value.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
There was a problem hiding this comment.
@nsivabalan Could you check this? It looks legit.
There was a problem hiding this comment.
my bad. meant to go for "< -1"
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This pull request introduces support for "empty clean" commits in Hudi's clean action. A new configuration property MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS controls creation of clean commits when no files require deletion, subject to time threshold and retention point validation conditions.
Sequence Diagram (CodeRabbit):
sequenceDiagram
actor User
participant CleanPlanActionExecutor
participant CleanActionExecutor
participant Timeline
participant Metadata
User->>CleanPlanActionExecutor: requestClean()
alt partitionsToClean is empty
CleanPlanActionExecutor->>CleanPlanActionExecutor: getEmptyCleanerPlan()
Note over CleanPlanActionExecutor: Check if empty clean<br/>should be created
alt maxDurationToCreateEmptyCleanMs > 0<br/>AND time threshold met<br/>AND retention point valid
CleanPlanActionExecutor->>Timeline: Save empty cleaner plan
CleanPlanActionExecutor-->>User: Return empty plan
else
CleanPlanActionExecutor-->>User: Return empty plan
end
else partitionsToClean has partitions
CleanPlanActionExecutor->>CleanPlanActionExecutor: Compute normal clean plan
CleanPlanActionExecutor-->>User: Return normal plan
end
User->>CleanActionExecutor: execute(cleanerPlan)
alt plan is empty
CleanActionExecutor->>CleanActionExecutor: createEmptyCleanMetadata()
CleanActionExecutor->>Metadata: Build metadata with<br/>zero deleted files<br/>empty partitions
else plan has files to delete
CleanActionExecutor->>CleanActionExecutor: convertCleanMetadata()
CleanActionExecutor->>Metadata: Build normal metadata
end
CleanActionExecutor->>Timeline: Reload active timeline
CleanActionExecutor->>Timeline: Commit clean metadata
CleanActionExecutor-->>User: Return clean stats
CodeRabbit: hudi-agent#6 (review)
|
|
||
| public static final ConfigProperty<Long> MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS = ConfigProperty | ||
| .key("hoodie.write.empty.clean.create.duration.ms") | ||
| .defaultValue(-1L) |
There was a problem hiding this comment.
Don't reject the property's own default.
Line 255 sets -1L as the disabled default, but Lines 452-455 reject every value <= -1. That makes HoodieCleanConfig.newBuilder().build() fail unless every caller explicitly overrides this new option, which breaks existing configs and tests.
Suggested fix
- if (maxDurationMs == 0 || maxDurationMs <= -1) {
- throw new IllegalArgumentException(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be >= 1, but was " + maxDurationMs);
+ if (maxDurationMs == 0 || maxDurationMs < -1) {
+ throw new IllegalArgumentException(
+ MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be -1 (disabled) or >= 1, but was " + maxDurationMs);
}Also applies to: 452-455
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java`
around lines 253 - 255, The validation currently rejects values <= -1 for
MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, which invalidates the property's intended
disabled default (-1L) and breaks HoodieCleanConfig.newBuilder().build(); update
the check that references MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS to allow -1 by
changing the condition to reject only values < -1 (or explicitly permit the
property's default sentinel), so the default -1L is accepted while still
rejecting invalid negatives.
— CodeRabbit (original) (source:comment#3111681444)
| writeClient1.savepoint(fourthCommitTs, "user", "comment"); | ||
| } finally { | ||
| writeClient1.close(); | ||
| } |
There was a problem hiding this comment.
Use try-with-resources for the savepoint client.
If getHoodieWriteClient(config) fails, Line 899 masks the real error with an NPE on writeClient1.close(). A try-with-resources block avoids that and keeps the cleanup local.
Suggested fix
- SparkRDDWriteClient writeClient1 = null;
- try {
- writeClient1 = getHoodieWriteClient(config);
- writeClient1.savepoint(fourthCommitTs, "user", "comment");
- } finally {
- writeClient1.close();
- }
+ try (SparkRDDWriteClient<?> writeClient1 = getHoodieWriteClient(config)) {
+ writeClient1.savepoint(fourthCommitTs, "user", "comment");
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java`
around lines 894 - 900, The current try/finally around SparkRDDWriteClient
writeClient1 masks startup failures by calling writeClient1.close() even when
getHoodieWriteClient(config) throws; replace this pattern with a
try-with-resources that declares the client in the resource header (e.g., try
(SparkRDDWriteClient writeClient1 = getHoodieWriteClient(config)) {
writeClient1.savepoint(fourthCommitTs, "user", "comment"); }) so the JVM
auto-closes the client only when it was created and avoids NPEs—update the block
that calls getHoodieWriteClient and savepoint accordingly.
— CodeRabbit (original) (source:comment#3111681453)
| throw new IllegalArgumentException(MAX_COMMITS_TO_CLEAN.key() + " must be >= 1, but was " + maxCommitsToClean); | ||
| } | ||
| long maxDurationMs = cleanConfig.getLong(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); | ||
| if (maxDurationMs == 0 || maxDurationMs <= -1) { |
There was a problem hiding this comment.
@nsivabalan Could you check this? It looks legit.
| log.warn("Skipping empty clean creation because earliestCommitToRetain would go backwards. " | ||
| + "Previous: {}, Current: {}. This can happen when cleaner configuration is changed.", | ||
| previousEarliestCommitToRetain, currentEarliestCommitToRetain); |
There was a problem hiding this comment.
Should the currentEarliestCommitToRetain in the cleaner plan be adjusted to previousEarliestCommitToRetain, or is that overkill?
There was a problem hiding this comment.
we can fix the getEarliestInstantToRetain in the current plan to previousEarliestCommitToRetain and go ahead w/ empty clean
There was a problem hiding this comment.
hey @yihua :
actually, returning Option.empty from here.
the completed clean instant will only contain the requestedTime for the earliest commit to retain, but not the whole HoodieInstant.
I don't think its worth fetching the HoodieInstant from previous clean plan(deser clean requested) for this case.
There was a problem hiding this comment.
I don't understand the claim. The previous ECTR is already fetched as previousEarliestCommitToRetain. In this case, set previousEarliestCommitToRetain in cleanerPlan won't work?
|
@yihua : addressed all feedback |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18337 +/- ##
============================================
- Coverage 68.85% 68.78% -0.08%
- Complexity 28235 28342 +107
============================================
Files 2460 2468 +8
Lines 135256 136060 +804
Branches 16391 16511 +120
============================================
+ Hits 93125 93583 +458
- Misses 34761 35077 +316
- Partials 7370 7400 +30
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
|
||
| public static final ConfigProperty<Long> MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS = ConfigProperty | ||
| .key("hoodie.write.empty.clean.create.duration.ms") | ||
| public static final ConfigProperty<Long> MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS = ConfigProperty |
There was a problem hiding this comment.
MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS -> `INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS
|
|
||
| public HoodieCleanConfig.Builder withMaxDurationToCreateEmptyClean(long durationMs) { | ||
| cleanConfig.setValue(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, String.valueOf(durationMs)); | ||
| public HoodieCleanConfig.Builder withMaxIntervalToCreateEmptyCleanHours(long durationHours) { |
There was a problem hiding this comment.
withIntervalToCreateEmptyCleanHours
| long maxDurationMs = cleanConfig.getLong(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); | ||
| if (maxDurationMs == 0 || maxDurationMs <= -1) { | ||
| throw new IllegalArgumentException(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS.key() + " must be >= 1, but was " + maxDurationMs); | ||
| long maxDurationHours = cleanConfig.getLong(MAX_INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS); |
There was a problem hiding this comment.
maxDurationHours -> emptyCleanIntervalHours
|
|
||
| public long maxDurationToCreateEmptyCleanMs() { | ||
| return getLong(HoodieCleanConfig.MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS); | ||
| public long maxIntervalToCreateEmptyCleanHours() { |
There was a problem hiding this comment.
getIntervalHoursToCreateEmptyClean
| log.warn("Skipping empty clean creation because earliestCommitToRetain would go backwards. " | ||
| + "Previous: {}, Current: {}. This can happen when cleaner configuration is changed.", | ||
| previousEarliestCommitToRetain, currentEarliestCommitToRetain); |
There was a problem hiding this comment.
I don't understand the claim. The previous ECTR is already fetched as previousEarliestCommitToRetain. In this case, set previousEarliestCommitToRetain in cleanerPlan won't work?
|
Comments are addressed in #18587 |

Describe the issue this Pull Request addresses
This PR adds support for creating empty clean commits to optimize clean planning performance for append-only datasets.
Problem: In datasets with incremental cleaning enabled that receive infrequent updates or are primarily append-only, the clean planner performs a full table scan on every ingestion run because there are
no clean plans to mark progress. This leads to significant performance overhead, especially for large tables.
Solution: Introduce a new configuration hoodie.write.empty.clean.internval.hours that allows creating empty clean commits after a configurable duration. These empty clean commits update the
earliestCommitToRetain value, enabling subsequent clean planning operations to only scan partitions modified after the last empty clean, avoiding expensive full table scans.
Summary and Changelog
User-facing changes:
Detailed changes:
- Added
hoodie.write.empty.clean.internval.hoursconfig property with builder method- Marked as advanced config for power users
- Modified clean parallelism calculation to ensure minimum of 1 (was causing issues with empty plans)
- Added createEmptyCleanMetadata() method to construct metadata for empty cleans
- Updated runClean() to handle empty clean stats by creating appropriate metadata
- Added getEmptyCleanerPlan() method to construct cleaner plans with no files to delete
- Modified requestClean() to return empty plans when partitions list is empty
- Added logic in requestCleanInternal() to check if empty clean commit should be created based on:
- Added testEmptyClean() in TestCleaner.java to validate empty clean execution
- Added testEmptyCleansAddedAfterThreshold() in TestCleanPlanExecutor.java with threshold scenarios
- Updated existing test fixtures to handle empty metadata properly
Impact
Performance Impact: Positive - significantly reduces clean planning time for append-only or infrequently updated datasets by avoiding full table scans
API Changes: None - purely additive configuration
Behavior Changes:
Risk Level
low
Documentation Update
Contributor's checklist