Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Line 1239: The heartbeatClient.stop() call is still only on the success path (after return true). If table.rollback() throws, the heartbeat is never stopped, which blocks other writers from attempting the rollback until it naturally expires. This was flagged in the previous review — could you wrap the rollback execution + stop in a try/finally? Something like:

try {
  // execute rollback...
} finally {
  if (config.isExclusiveRollbackEnabled()) {
    heartbeatClient.stop(rollbackInstantTimeOpt.get());
  }
}

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Original file line number Diff line number Diff line change
Expand Up @@ -1248,46 +1248,51 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
.findFirst());
Option<HoodieRollbackPlan> rollbackPlanOption;
String rollbackInstantTime;
if (pendingRollbackInfo.isPresent()) {
rollbackPlanOption = Option.of(pendingRollbackInfo.get().getRollbackPlan());
rollbackInstantTime = pendingRollbackInfo.get().getRollbackInstant().requestedTime();
} else {
if (commitInstantOpt.isEmpty()) {
log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath());

// ---- SCHEDULE PHASE ----
// Determines which rollback plan to use and creates a new one if necessary.
Option<Pair<HoodieInstant, Option<HoodieRollbackPlan>>> scheduleResult =
resolveOrScheduleRollback(table, commitInstantTime, commitInstantOpt, pendingRollbackInfo, suppliedRollbackInstantTime, skipLocking);
if (scheduleResult.isEmpty()) {
return false;
}
Option<HoodieInstant> rollbackInstantOpt = Option.of(scheduleResult.get().getLeft());
Option<HoodieRollbackPlan> rollbackPlanOption = scheduleResult.get().getRight();

// ---- EXECUTION PHASE ----
boolean isMultiWriter = config.getWriteConcurrencyMode().supportsMultiWriter();
if (rollbackPlanOption.isPresent()) {
if (isMultiWriter && !acquireRollbackHeartbeatIfMultiWriter(table, rollbackInstantOpt)) {
return false;
}
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}

try {
rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false));
rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false);
// Execute rollback — no lock held during this operation.

// There can be a case where the inflight rollback failed after the instant files
// are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
// not present in the timeline. In such a case, the hoodie instant instance
// is reconstructed to allow the rollback to be reattempted, and the deleteInstants
// is set to false since they are already deleted.
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
? table.rollback(context, rollbackInstantOpt.get().requestedTime(), commitInstantOpt.get(), true, skipLocking)
: table.rollback(context, rollbackInstantOpt.get().requestedTime(), table.getMetaClient().createNewInstant(
HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
false, skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
}
return true;
} finally {
if (!skipLocking) {
txnManager.endStateChange(Option.empty());
if (isMultiWriter) {
try {
heartbeatClient.stop(rollbackInstantOpt.get().requestedTime());
} catch (Exception e) {
log.warn("Failed to stop heartbeat for rollback instant {}", rollbackInstantOpt.get().requestedTime(), e);
}
}
}
}

if (rollbackPlanOption.isPresent()) {
// There can be a case where the inflight rollback failed after the instant files
// are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
// not present in the timeline. In such a case, the hoodie instant instance
// is reconstructed to allow the rollback to be reattempted, and the deleteInstants
// is set to false since they are already deleted.
// Execute rollback
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
: table.rollback(context, rollbackInstantTime, table.getMetaClient().createNewInstant(
HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
false, skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
}
return true;
} else {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime);
}
Expand All @@ -1297,6 +1302,85 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
}
}

/**
* In multi-writer mode, acquires a heartbeat for the rollback instant under a transaction to ensure
* only one writer executes the rollback at a time. No-op if multi-writer is not enabled.
*
* @return true if heartbeat was successfully acquired and rollback can be executed by the writer, else false
*/
private boolean acquireRollbackHeartbeatIfMultiWriter(HoodieTable table, Option<HoodieInstant> rollbackInstantOpt) throws IOException {
try {
txnManager.beginStateChange(rollbackInstantOpt, txnManager.getLastCompletedTransactionOwner());
if (!this.heartbeatClient.isHeartbeatExpired(rollbackInstantOpt.get().requestedTime())) {
LOG.error("Rollback heartbeat already exists for instant {}", rollbackInstantOpt.get().requestedTime());
return false;
}
if (table.getMetaClient().reloadActiveTimeline().getRollbackTimeline().filterCompletedInstants().getInstantsAsStream()
.anyMatch(instant -> EQUALS.test(instant.requestedTime(), rollbackInstantOpt.get().requestedTime()))) {
LOG.info("Requested rollback instant {} is already completed in the active timeline", rollbackInstantOpt.get().requestedTime());
return false;
}

this.heartbeatClient.start(rollbackInstantOpt.get().requestedTime());
return true;
} finally {
txnManager.endStateChange(rollbackInstantOpt);
}
}

/**
* Resolves an existing pending rollback or schedules a new one for the given commit instant.
*
* @return Option containing the rollback instant and plan pair, or empty if the commit instant
* is no longer present on the timeline (indicating no rollback is needed).
*/
private Option<Pair<HoodieInstant, Option<HoodieRollbackPlan>>> resolveOrScheduleRollback(
HoodieTable table, String commitInstantTime, Option<HoodieInstant> commitInstantOpt,
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, Option<String> suppliedRollbackInstantTime,
boolean skipLocking) {
if (pendingRollbackInfo.isPresent()) {
// Case 1: caller already resolved a pending rollback — re-use it without taking a lock.
return Option.of(Pair.of(pendingRollbackInfo.get().getRollbackInstant(),
Option.of(pendingRollbackInfo.get().getRollbackPlan())));
}

// Case 2: no pending rollback supplied — reload the timeline under lock to get the latest view.
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
if (config.shouldAvoidDuplicateRollbackPlan()) {
// Check if another writer already scheduled a rollback for this instant to avoid duplicates.
table.getMetaClient().reloadActiveTimeline();
Option<HoodiePendingRollbackInfo> pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
if (pendingRollbackOpt.isPresent()) {
// Case 2a: a concurrent writer already scheduled the rollback — re-use it.
return Option.of(Pair.of(pendingRollbackOpt.get().getRollbackInstant(),
Option.of(pendingRollbackOpt.get().getRollbackPlan())));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The reload-timeline-and-check-pending-rollback step inside the lock is now gated on isEnforceSingleRollbackInstant(). In the old code this happened unconditionally in the else branch. Is skipping it intentional for the non-enforce path? In multi-writer mode without enforcement, two writers could both miss a concurrent pending rollback and each schedule one for the same commit.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if in master, what you say is happening.
https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java

I dont see reload-timeline-and-check-pending-rollback happening here.
So those are the changes introduced in this patch.

}
commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
.findFirst());
}
if (commitInstantOpt.isEmpty()) {
log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath());
return Option.empty();
Comment thread
nsivabalan marked this conversation as resolved.
}
// Case 2b: no pending rollback exists — schedule one now.
// Refresh commitInstantOpt from the reloaded timeline.
String newRollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false));
HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, newRollbackInstantTime,
table.getMetaClient().getTimelineLayout().getInstantComparator().requestedTimeOrderedComparator());
Option<HoodieRollbackPlan> rollbackPlan = table.scheduleRollback(context, newRollbackInstantTime, commitInstantOpt.get(),
false, config.shouldRollbackUsingMarkers(), false);
return Option.of(Pair.of(rollbackInstant, rollbackPlan));
} finally {
if (!skipLocking) {
txnManager.endStateChange(Option.empty());
}
}
}

/**
* Main API to rollback failed bootstrap.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated "
+ "during the writes. Turned on by default.");

public static final ConfigProperty<String> ROLLBACK_AVOID_DUPLICATE_PLAN = ConfigProperty
.key("hoodie.rollback.avoid.duplicate.plan")
.defaultValue("false")
.markAdvanced()
.withDocumentation("When enabled in multi-writer mode, before scheduling a new rollback plan, the writer reloads "
+ "the timeline under lock to check if another writer already scheduled one for the same failed commit. "
+ "This avoids duplicate rollback instants and uses heartbeats to ensure only one writer executes the rollback at a time.");

public static final ConfigProperty<String> FAIL_JOB_ON_DUPLICATE_DATA_FILE_DETECTION = ConfigProperty
.key("hoodie.fail.job.on.duplicate.data.file.detection")
.defaultValue("false")
Expand Down Expand Up @@ -1578,6 +1586,10 @@ public boolean shouldRollbackUsingMarkers() {
return getBoolean(ROLLBACK_USING_MARKERS_ENABLE);
}

public boolean shouldAvoidDuplicateRollbackPlan() {
return getBoolean(ROLLBACK_AVOID_DUPLICATE_PLAN) && getWriteConcurrencyMode().supportsMultiWriter();
}

public boolean enableComplexKeygenValidation() {
return getBoolean(ENABLE_COMPLEX_KEYGEN_VALIDATION);
}
Expand Down
Loading
Loading