Conversation
📝 WalkthroughWalkthroughThis PR introduces an exclusive rollback feature that coordinates multi-writer rollback execution via heartbeat state validation. When enabled, the rollback logic checks for pending rollback info and heartbeat expiry to determine whether to proceed with rollback, defer execution, or skip if the commit is already handled. Changes
Sequence DiagramsequenceDiagram
actor Client
participant RollbackService as Rollback Service
participant Timeline as Timeline Manager
participant Heartbeat as Heartbeat Client
participant Storage as Storage Layer
Client->>RollbackService: rollback(commitTime)
RollbackService->>Timeline: reloadActiveTimeline()
RollbackService->>Timeline: getPendingRollbackInfo()
alt Pending Rollback + Expired Heartbeat
RollbackService->>Storage: deleteHeartbeatFile()
RollbackService->>Heartbeat: start(rollbackInstantTime)
RollbackService->>Storage: performRollback()
RollbackService->>Heartbeat: stop(rollbackInstantTime)
RollbackService-->>Client: return true
else Pending Rollback + Active Heartbeat
RollbackService-->>Client: return false
else No Pending Rollback
RollbackService->>Timeline: checkCommitExists(commitTime)
alt Commit Exists
RollbackService->>Timeline: scheduleRollback(commitTime)
RollbackService->>Heartbeat: start(rollbackInstantTime)
RollbackService->>Storage: performRollback()
RollbackService->>Heartbeat: stop(rollbackInstantTime)
RollbackService-->>Client: return true
else Commit Absent
RollbackService-->>Client: return false
end
end
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
Greptile SummaryThis PR adds support for exclusive rollbacks in multi-writer mode, introducing a new configuration property Key changes:
One compilation error is present ( Confidence Score: 3/5Not safe to merge as-is due to a compilation error (LOG vs log) in the core rollback method. The feature design is sound and tests are thorough, but LOG.info on line 1187 is undefined — @slf4j generates log (lowercase) and no parent defines LOG. This will fail to compile and is a blocking issue. hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java — compilation error on line 1187 (LOG → log). Important Files Changed
Sequence DiagramsequenceDiagram
participant W1 as Writer 1
participant W2 as Writer 2
participant Lock as TxnManager (Lock)
participant HB as HeartbeatClient
participant TL as Timeline
Note over W1,TL: Exclusive Rollback Enabled (multi-writer mode)
W1->>Lock: beginStateChange()
W1->>TL: reloadActiveTimeline()
W1->>TL: getPendingRollbackInfo(commitTime)
alt No pending rollback and commit present
W1->>HB: start(rollbackInstantTime)
W1->>TL: scheduleRollback(rollbackInstantTime)
W1->>Lock: endStateChange()
W1->>TL: table.rollback(...)
W1->>HB: stop(rollbackInstantTime)
else Pending rollback exists, heartbeat EXPIRED
W1->>HB: deleteHeartbeatFile(rollbackInstantTime)
W1->>HB: start(rollbackInstantTime)
W1->>Lock: endStateChange()
W1->>TL: table.rollback(...)
W1->>HB: stop(rollbackInstantTime)
else Pending rollback exists, heartbeat ACTIVE
Note over W1: Another writer is rolling back
W1->>Lock: endStateChange()
W1-->>W1: return false (skip)
end
Note over W2: Writer 2 attempts same rollback later
W2->>Lock: beginStateChange()
W2->>TL: reloadActiveTimeline()
alt Commit no longer in timeline
W2->>Lock: endStateChange()
W2-->>W2: return false (already rolled back)
end
Reviews (1): Last reviewed commit: "Add tests" | Re-trigger Greptile |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java (1)
301-305: Expose this flag throughHoodieWriteConfig.Builder.The new public config is only reachable via raw properties today, which is already forcing stringly-typed setup in the new tests. A dedicated
withExclusiveRollbackEnabled(boolean)keeps the API discoverable and avoids key typos at call sites.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java` around lines 301 - 305, The new config ENABLE_EXCLUSIVE_ROLLBACK is only settable via raw properties; add a fluent setter on HoodieWriteConfig.Builder to expose it. In HoodieWriteConfig.Builder implement a method withExclusiveRollbackEnabled(boolean enabled) that sets the ENABLE_EXCLUSIVE_ROLLBACK ConfigProperty accordingly (use ENABLE_EXCLUSIVE_ROLLBACK.key() or the provided ConfigProperty API to set the boolean as a string) and return the Builder for chaining; update any relevant builder build flow to ensure the property is applied.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`:
- Around line 1164-1168: The code may use a stale commitInstantOpt after
reloading the timeline; update the logic in BaseHoodieTableServiceClient so that
after calling table.reloadActiveTimeline() and re-checking commits (the
exclusive branch around the reloaded timeline check), you re-evaluate
commitInstantOpt by re-querying
table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() (same
filter using EQUALS.test(instant.requestedTime(), commitInstantTime)) before
dereferencing it or scheduling rollback; ensure the subsequent use of
rollbackPlanOption / rollbackInstantTimeOpt uses this refreshed
commitInstantOpt.
- Around line 1184-1190: The rollback heartbeat started via
heartbeatClient.start(rollbackInstantTimeOpt.get()) in
BaseHoodieTableServiceClient may be left running if subsequent calls (e.g.,
scheduleRollback(...) or table.rollback(...)) throw; wrap the sequence that
starts the heartbeat and then performs rollback-scheduling/execution in a
try/finally (or catch and rethrow) to ensure
heartbeatClient.stop(rollbackInstantTimeOpt.get()) is always called on failure.
Apply the same pattern to the other similar blocks referenced (around the
instances that call heartbeatClient.start(...) at the locations corresponding to
lines ~1203-1205 and ~1230-1241) so any early exception will stop the rollback
heartbeat before propagating the error.
---
Nitpick comments:
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java`:
- Around line 301-305: The new config ENABLE_EXCLUSIVE_ROLLBACK is only settable
via raw properties; add a fluent setter on HoodieWriteConfig.Builder to expose
it. In HoodieWriteConfig.Builder implement a method
withExclusiveRollbackEnabled(boolean enabled) that sets the
ENABLE_EXCLUSIVE_ROLLBACK ConfigProperty accordingly (use
ENABLE_EXCLUSIVE_ROLLBACK.key() or the provided ConfigProperty API to set the
boolean as a string) and return the Builder for chaining; update any relevant
builder build flow to ensure the property is applied.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9d434095-7cfc-4d6b-99cb-df5e7ad1b4f4
📒 Files selected for processing (3)
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.javahudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
| Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() | ||
| .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) | ||
| .findFirst()); | ||
| Option<HoodieRollbackPlan> rollbackPlanOption; | ||
| String rollbackInstantTime; | ||
| if (pendingRollbackInfo.isPresent()) { | ||
| Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty(); | ||
| Option<String> rollbackInstantTimeOpt; |
There was a problem hiding this comment.
Refresh commitInstantOpt after reloading the timeline.
The exclusive branch rechecks commit presence on the reloaded timeline, but Line 1205 still uses the pre-lock commitInstantOpt. If the first snapshot missed the instant and the refreshed one finds it, this ends up dereferencing an empty option instead of scheduling the rollback.
Proposed fix
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
.findFirst());
@@
if (config.isExclusiveRollbackEnabled()) {
// Reload meta client within the lock so that the timeline is latest while executing pending rollback
table.getMetaClient().reloadActiveTimeline();
+ commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
+ .findFirst());
Option<HoodiePendingRollbackInfo> pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> info.getRollbackInstant().requestedTime());
@@
- } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
- .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
- .findFirst()).isEmpty()) {
+ } else if (commitInstantOpt.isEmpty()) {
// Assume rollback is already executed since the commit is no longer present in the timeline
return false;
} else {Also applies to: 1179-1205
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 1164 - 1168, The code may use a stale commitInstantOpt after
reloading the timeline; update the logic in BaseHoodieTableServiceClient so that
after calling table.reloadActiveTimeline() and re-checking commits (the
exclusive branch around the reloaded timeline check), you re-evaluate
commitInstantOpt by re-querying
table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() (same
filter using EQUALS.test(instant.requestedTime(), commitInstantTime)) before
dereferencing it or scheduling rollback; ensure the subsequent use of
rollbackPlanOption / rollbackInstantTimeOpt uses this refreshed
commitInstantOpt.
| if (pendingRollbackOpt.isPresent()) { | ||
| // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback | ||
| if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { | ||
| LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); | ||
| HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config); | ||
| heartbeatClient.start(rollbackInstantTimeOpt.get()); | ||
| rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan); |
There was a problem hiding this comment.
Always stop the rollback heartbeat when this writer started it.
If scheduleRollback(...) or table.rollback(...) throws after heartbeatClient.start(...), the heartbeat never gets stopped. In the exclusive path that leaves a live rollback heartbeat behind, so other writers will keep deferring until it expires.
Proposed fix
Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
Option<String> rollbackInstantTimeOpt;
+ boolean startedRollbackHeartbeat = false;
@@
if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt);
HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config);
heartbeatClient.start(rollbackInstantTimeOpt.get());
+ startedRollbackHeartbeat = true;
rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
@@
rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false)));
heartbeatClient.start(rollbackInstantTimeOpt.get());
+ startedRollbackHeartbeat = true;
rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false);
@@
- HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
- ? table.rollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), true, skipLocking)
- : table.rollback(context, rollbackInstantTimeOpt.get(), table.getMetaClient().createNewInstant(
- HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
- false, skipLocking);
- if (timerContext != null) {
- long durationInMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
- }
- if (config.isExclusiveRollbackEnabled()) {
- heartbeatClient.stop(rollbackInstantTimeOpt.get());
- }
- return true;
+ try {
+ HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
+ ? table.rollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), true, skipLocking)
+ : table.rollback(context, rollbackInstantTimeOpt.get(), table.getMetaClient().createNewInstant(
+ HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
+ false, skipLocking);
+ if (timerContext != null) {
+ long durationInMs = metrics.getDurationInMs(timerContext.stop());
+ metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
+ }
+ return true;
+ } finally {
+ if (config.isExclusiveRollbackEnabled() && startedRollbackHeartbeat && rollbackInstantTimeOpt.isPresent()) {
+ heartbeatClient.stop(rollbackInstantTimeOpt.get());
+ }
+ }Also applies to: 1203-1205, 1230-1241
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 1184 - 1190, The rollback heartbeat started via
heartbeatClient.start(rollbackInstantTimeOpt.get()) in
BaseHoodieTableServiceClient may be left running if subsequent calls (e.g.,
scheduleRollback(...) or table.rollback(...)) throw; wrap the sequence that
starts the heartbeat and then performs rollback-scheduling/execution in a
try/finally (or catch and rethrow) to ensure
heartbeatClient.stop(rollbackInstantTimeOpt.get()) is always called on failure.
Apply the same pattern to the other similar blocks referenced (around the
instances that call heartbeatClient.start(...) at the locations corresponding to
lines ~1203-1205 and ~1230-1241) so any early exception will stop the rollback
heartbeat before propagating the error.
| if (pendingRollbackOpt.isPresent()) { | ||
| // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback | ||
| if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { | ||
| LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); |
There was a problem hiding this comment.
Compilation error:
LOG is undefined — should be log
The class is annotated with @Slf4j, which generates a field named log (lowercase). There is no LOG field defined here or in any parent class (BaseHoodieClient also uses log). This line will fail to compile.
| LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); | |
| log.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); |
| // Heartbeat is still active for another writer, ignore rollback for now | ||
| // TODO: ABCDEFGHI revisit return value | ||
| return false; |
There was a problem hiding this comment.
Leftover placeholder TODO comment
The comment // TODO: ABCDEFGHI revisit return value looks like a development-time placeholder that was left in accidentally. Based on the test testExclusiveRollbackDefersToActiveHeartbeat, returning false when another writer's heartbeat is active is the intended and correct behavior. If there is still an open design question here, the TODO should use a proper JIRA issue reference instead of ABCDEFGHI.
| // Heartbeat is still active for another writer, ignore rollback for now | |
| // TODO: ABCDEFGHI revisit return value | |
| return false; | |
| } else { | |
| // Heartbeat is still active for another writer — skip rollback for now. | |
| // The active writer will complete the rollback; caller should retry later. | |
| return false; |
Mirror of apache#18448 for automated bot review.
Original author: @lokeshj1703
Base branch: master
Summary by CodeRabbit
New Features
Configuration
hoodie.rollback.enforce.single.rollback.instantto enable exclusive rollback behavior in multi-writer environments.