From 0be2c5eb7a7cfa481e3d5928e61ae4ea6ff7a935 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 2 Apr 2026 18:24:05 +0530 Subject: [PATCH 1/7] fix: Add support for exclusive rollbacks with multi writer --- .../client/BaseHoodieTableServiceClient.java | 59 +++++++++++++++---- .../apache/hudi/config/HoodieWriteConfig.java | 10 ++++ 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 26282affd1771..1df2e451bdbac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) .findFirst()); - Option rollbackPlanOption; - String rollbackInstantTime; - if (pendingRollbackInfo.isPresent()) { + Option rollbackPlanOption = Option.empty(); + Option rollbackInstantTimeOpt; + if (!config.isExclusiveRollbackEnabled() && pendingRollbackInfo.isPresent()) { + // Only case when lock can be skipped is if exclusive rollback is disabled and + // there is a pending rollback info available rollbackPlanOption = Option.of(pendingRollbackInfo.get().getRollbackPlan()); - rollbackInstantTime = pendingRollbackInfo.get().getRollbackInstant().requestedTime(); + rollbackInstantTimeOpt = Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime()); } else { - if (commitInstantOpt.isEmpty()) { - log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); - return false; - } if (!skipLocking) { txnManager.beginStateChange(Option.empty(), Option.empty()); } try { - rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false)); - rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); + if (config.isExclusiveRollbackEnabled()) { + // Reload meta client within the lock so that the timeline is latest while executing pending rollback + table.getMetaClient().reloadActiveTimeline(); + Option pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); + rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> info.getRollbackInstant().requestedTime()); + if (pendingRollbackOpt.isPresent()) { + // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback + if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { + LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); + HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config); + heartbeatClient.start(rollbackInstantTimeOpt.get()); + rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan); + } else { + // Heartbeat is still active for another writer, ignore rollback for now + // TODO: ABCDEFGHI revisit return value + return false; + } + } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() + .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) + .findFirst()).isEmpty()) { + // Assume rollback is already executed since the commit is no longer present in the timeline + return false; + } + } else { + // Case where no pending rollback is present, + if (commitInstantOpt.isEmpty()) { + log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); + return false; + } + rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false))); + if (config.isExclusiveRollbackEnabled()) { + heartbeatClient.start(rollbackInstantTimeOpt.get()); + } + rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); + } } finally { if (!skipLocking) { txnManager.endStateChange(Option.empty()); @@ -1279,14 +1311,17 @@ public boolean rollback(final String commitInstantTime, Option ENABLE_EXCLUSIVE_ROLLBACK = ConfigProperty + .key("hoodie.rollback.enforce.single.rollback.instant") + .defaultValue("false") + .markAdvanced() + .withDocumentation("Enables exclusive rollback so that rollback plan is generated and executed by only one writer at a time"); + public static final ConfigProperty FAIL_JOB_ON_DUPLICATE_DATA_FILE_DETECTION = ConfigProperty .key("hoodie.fail.job.on.duplicate.data.file.detection") .defaultValue("false") @@ -1578,6 +1584,10 @@ public boolean shouldRollbackUsingMarkers() { return getBoolean(ROLLBACK_USING_MARKERS_ENABLE); } + public boolean isExclusiveRollbackEnabled() { + return getBoolean(ENABLE_EXCLUSIVE_ROLLBACK) && getWriteConcurrencyMode().supportsMultiWriter(); + } + public boolean enableComplexKeygenValidation() { return getBoolean(ENABLE_COMPLEX_KEYGEN_VALIDATION); } From c573c65562daedf5a5251e0ed65e108e9955cee5 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Tue, 7 Apr 2026 09:35:26 +0530 Subject: [PATCH 2/7] Add tests --- .../hudi/client/TestClientRollback.java | 198 ++++++++++++++++++ 1 file changed, 198 insertions(+) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 82fe9591de270..71507f1a92b6f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieRollbackRequest; +import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -66,6 +67,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -825,6 +827,202 @@ public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, b } } + /** + * Test exclusive rollback with multi-writer: when a pending rollback exists with an expired heartbeat + * (no heartbeat file present → returns 0L → always expired), the current writer should take ownership + * and execute the rollback. + */ + @Test + public void testExclusiveRollbackPendingRollbackHeartbeatExpired() throws Exception { + final String p1 = "2016/05/01"; + final String p2 = "2016/05/02"; + final String commitTime1 = "20160501010101"; + final String commitTime2 = "20160502020601"; + final String commitTime3 = "20160506030611"; + final String rollbackInstantTime = "20160506040611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + } + }; + Map partitionAndFileId3 = new HashMap() { + { + put(p1, "id31"); + put(p2, "id32"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1, p2) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1).getLeft() + .addCommit(commitTime2).withBaseFilesInPartitions(partitionAndFileId2).getLeft() + .addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + + // Create a valid pending rollback plan for commitTime3 + HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); + List rollbackRequestList = partitionAndFileId3.entrySet().stream() + .map(entry -> new HoodieRollbackRequest(entry.getKey(), EMPTY_STRING, EMPTY_STRING, + Collections.singletonList( + metaClient.getBasePath() + "/" + entry.getKey() + "/" + + FileCreateUtilsLegacy.baseFileName(commitTime3, entry.getValue())), + Collections.emptyMap())) + .collect(Collectors.toList()); + rollbackPlan.setRollbackRequests(rollbackRequestList); + rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION)); + FileCreateUtilsLegacy.createRequestedRollbackFile(metaClient.getBasePath().toString(), rollbackInstantTime, rollbackPlan); + // No heartbeat file → getLastHeartbeatTime returns 0L → heartbeat is always expired + + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + boolean result = client.rollback(commitTime3); + assertTrue(result, "Rollback should execute when pending rollback heartbeat is expired"); + + assertFalse(testTable.inflightCommitExists(commitTime3)); + assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); + assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2)); + + // Verify the pending rollback instant was reused and completed + metaClient.reloadActiveTimeline(); + List rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants(); + assertEquals(1, rollbackInstants.size()); + assertTrue(rollbackInstants.get(0).isCompleted()); + assertEquals(rollbackInstantTime, rollbackInstants.get(0).requestedTime()); + + // Verify heartbeat was cleaned up after rollback completion + assertFalse(HoodieHeartbeatClient.heartbeatExists(storage, basePath, rollbackInstantTime)); + } + } + + /** + * Test exclusive rollback with multi-writer: when a pending rollback exists with an active heartbeat + * (another writer is currently executing the rollback), the current writer should skip it and return false. + */ + @Test + public void testExclusiveRollbackPendingRollbackHeartbeatActive() throws Exception { + final String p1 = "2016/05/01"; + final String p2 = "2016/05/02"; + final String commitTime1 = "20160501010101"; + final String commitTime2 = "20160502020601"; + final String commitTime3 = "20160506030611"; + final String rollbackInstantTime = "20160506040611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + } + }; + Map partitionAndFileId3 = new HashMap() { + { + put(p1, "id31"); + put(p2, "id32"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1, p2) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1).getLeft() + .addCommit(commitTime2).withBaseFilesInPartitions(partitionAndFileId2).getLeft() + .addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + + // Create a pending rollback plan for commitTime3 + HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); + rollbackPlan.setRollbackRequests(Collections.emptyList()); + rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION)); + FileCreateUtilsLegacy.createRequestedRollbackFile(metaClient.getBasePath().toString(), rollbackInstantTime, rollbackPlan); + + // Simulate an active heartbeat by another writer for the rollback instant + try (HoodieHeartbeatClient otherWriterHeartbeat = new HoodieHeartbeatClient( + storage, basePath, config.getHoodieClientHeartbeatIntervalInMs(), + config.getHoodieClientHeartbeatTolerableMisses())) { + otherWriterHeartbeat.start(rollbackInstantTime); + // The heartbeat file is fresh → isHeartbeatExpired returns false + + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + boolean result = client.rollback(commitTime3); + assertFalse(result, "Rollback should be skipped when another writer holds an active heartbeat"); + + // Verify the inflight commit and data files are still present + assertTrue(testTable.inflightCommitExists(commitTime3)); + assertTrue(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); + + // Verify no completed rollback was created + metaClient.reloadActiveTimeline(); + List completedRollbacks = metaClient.getActiveTimeline() + .getRollbackTimeline().filterCompletedInstants().getInstants(); + assertEquals(0, completedRollbacks.size()); + } + } + } + + /** + * Test exclusive rollback with multi-writer: when the commit is no longer in the timeline + * (already rolled back by another writer) and no pending rollback exists, rollback should return false. + */ + @Test + public void testExclusiveRollbackWhenCommitNotInTimeline() throws Exception { + final String p1 = "2016/05/01"; + final String commitTime1 = "20160501010101"; + final String nonExistentCommitTime = "20160506030611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1); + + // nonExistentCommitTime is not in the timeline and no pending rollback exists for it + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + boolean result = client.rollback(nonExistentCommitTime); + assertFalse(result, "Rollback should return false when commit is not in timeline (already rolled back)"); + + // Verify no rollback instant was created + metaClient.reloadActiveTimeline(); + assertTrue(metaClient.getActiveTimeline().getRollbackTimeline().empty()); + // Existing commit should be unaffected + assertTrue(testTable.baseFilesExist(partitionAndFileId1, commitTime1)); + } + } + + private HoodieWriteConfig buildExclusiveRollbackMultiWriterConfig() { + Properties props = new Properties(); + props.setProperty(HoodieWriteConfig.ENABLE_EXCLUSIVE_ROLLBACK.key(), "true"); + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withRollbackUsingMarkers(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(InProcessLockProvider.class) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMetadataIndexColumnStats(false).enable(false).build()) + .withProperties(props) + .build(); + } + @Test public void testFallbackToListingBasedRollbackForCompletedInstant() throws Exception { // Let's create some commit files and base files From 56023c0fc50ba38e74b0594f17e63447f1850917 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Tue, 21 Apr 2026 00:01:45 +0530 Subject: [PATCH 3/7] Address review comments --- .../client/BaseHoodieTableServiceClient.java | 132 +++++++++++------- .../apache/hudi/config/HoodieWriteConfig.java | 6 +- .../hudi/client/TestClientRollback.java | 2 +- 3 files changed, 82 insertions(+), 58 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 1df2e451bdbac..e2a20f9aff6f1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -1245,56 +1245,44 @@ public boolean rollback(final String commitInstantTime, Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) .findFirst()); - Option rollbackPlanOption = Option.empty(); + + // ---- SCHEDULE PHASE ---- + // Determines which rollback plan to use and creates a new one if necessary. Option rollbackInstantTimeOpt; - if (!config.isExclusiveRollbackEnabled() && pendingRollbackInfo.isPresent()) { - // Only case when lock can be skipped is if exclusive rollback is disabled and - // there is a pending rollback info available - rollbackPlanOption = Option.of(pendingRollbackInfo.get().getRollbackPlan()); + Option rollbackPlanOption; + + if (pendingRollbackInfo.isPresent()) { + // Case 1: caller already resolved a pending rollback — re-use it without taking a lock. rollbackInstantTimeOpt = Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime()); + rollbackPlanOption = Option.of(pendingRollbackInfo.get().getRollbackPlan()); } else { + // Case 2: no pending rollback supplied — reload the timeline under lock to get the latest view. if (!skipLocking) { txnManager.beginStateChange(Option.empty(), Option.empty()); } try { - if (config.isExclusiveRollbackEnabled()) { - // Reload meta client within the lock so that the timeline is latest while executing pending rollback - table.getMetaClient().reloadActiveTimeline(); - Option pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); - rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> info.getRollbackInstant().requestedTime()); - if (pendingRollbackOpt.isPresent()) { - // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback - if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { - LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); - HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config); - heartbeatClient.start(rollbackInstantTimeOpt.get()); - rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan); - } else { - // Heartbeat is still active for another writer, ignore rollback for now - // TODO: ABCDEFGHI revisit return value - return false; - } - } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() - .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) - .findFirst()).isEmpty()) { - // Assume rollback is already executed since the commit is no longer present in the timeline - return false; - } + table.getMetaClient().reloadActiveTimeline(); + Option pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); + if (pendingRollbackOpt.isPresent()) { + // Case 2a: a concurrent writer already scheduled the rollback — re-use it. + rollbackInstantTimeOpt = Option.of(pendingRollbackOpt.get().getRollbackInstant().requestedTime()); + rollbackPlanOption = Option.of(pendingRollbackOpt.get().getRollbackPlan()); } else { - // Case where no pending rollback is present, + // Case 2b: no pending rollback exists — schedule one now. + // Refresh commitInstantOpt from the reloaded timeline. + commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() + .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) + .findFirst()); if (commitInstantOpt.isEmpty()) { log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); return false; } - rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false))); - if (config.isExclusiveRollbackEnabled()) { - heartbeatClient.start(rollbackInstantTimeOpt.get()); - } - rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); + String newRollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false)); + rollbackInstantTimeOpt = Option.of(newRollbackInstantTime); + rollbackPlanOption = table.scheduleRollback(context, newRollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); } } finally { if (!skipLocking) { @@ -1303,28 +1291,64 @@ public boolean rollback(final String commitInstantTime, Option EQUALS.test(instant.requestedTime(), commitInstantTime)); + if (!hasPendingRollback && !commitStillPresent) { + // Rollback was already completed by another writer. + return false; + } + // Take ownership: delete any stale heartbeat file and start emitting. + HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config); + heartbeatClient.start(rollbackInstantTimeOpt.get()); + emittingHeartbeat = true; + } finally { + if (!skipLocking) { + txnManager.endStateChange(Option.empty()); + } + } + } + + // Execute rollback — no lock held during this operation. + try { + if (rollbackPlanOption.isPresent()) { + // There can be a case where the inflight rollback failed after the instant files + // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is + // not present in the timeline. In such a case, the hoodie instant instance + // is reconstructed to allow the rollback to be reattempted, and the deleteInstants + // is set to false since they are already deleted. + HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent() + ? table.rollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), true, skipLocking) + : table.rollback(context, rollbackInstantTimeOpt.get(), table.getMetaClient().createNewInstant( + HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime), + false, skipLocking); + if (timerContext != null) { + long durationInMs = metrics.getDurationInMs(timerContext.stop()); + metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); + } + return true; + } else { + throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime); + } + } finally { + if (config.isEnforceSingleRollbackInstant() && emittingHeartbeat) { heartbeatClient.stop(rollbackInstantTimeOpt.get()); } - return true; - } else { - throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime); } } catch (Exception e) { metrics.emitRollbackFailure(e.getClass().getSimpleName()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index a4741e72c8187..a7678cd00a928 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -299,7 +299,7 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated " + "during the writes. Turned on by default."); - public static final ConfigProperty ENABLE_EXCLUSIVE_ROLLBACK = ConfigProperty + public static final ConfigProperty ROLLBACK_ENFORCE_SINGLE_INSTANT = ConfigProperty .key("hoodie.rollback.enforce.single.rollback.instant") .defaultValue("false") .markAdvanced() @@ -1584,8 +1584,8 @@ public boolean shouldRollbackUsingMarkers() { return getBoolean(ROLLBACK_USING_MARKERS_ENABLE); } - public boolean isExclusiveRollbackEnabled() { - return getBoolean(ENABLE_EXCLUSIVE_ROLLBACK) && getWriteConcurrencyMode().supportsMultiWriter(); + public boolean isEnforceSingleRollbackInstant() { + return getBoolean(ROLLBACK_ENFORCE_SINGLE_INSTANT) && getWriteConcurrencyMode().supportsMultiWriter(); } public boolean enableComplexKeygenValidation() { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 71507f1a92b6f..8b0be8d7b4a15 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -1006,7 +1006,7 @@ public void testExclusiveRollbackWhenCommitNotInTimeline() throws Exception { private HoodieWriteConfig buildExclusiveRollbackMultiWriterConfig() { Properties props = new Properties(); - props.setProperty(HoodieWriteConfig.ENABLE_EXCLUSIVE_ROLLBACK.key(), "true"); + props.setProperty(HoodieWriteConfig.ROLLBACK_ENFORCE_SINGLE_INSTANT.key(), "true"); return HoodieWriteConfig.newBuilder() .withPath(basePath) .withRollbackUsingMarkers(false) From 98324bac92fa6829559a523ef8f93dfacc1e97b5 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Tue, 21 Apr 2026 00:06:08 +0530 Subject: [PATCH 4/7] addendum change --- .../org/apache/hudi/client/BaseHoodieTableServiceClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index e2a20f9aff6f1..7f3586f079c46 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -1314,7 +1314,6 @@ public boolean rollback(final String commitInstantTime, Option Date: Mon, 20 Apr 2026 20:33:18 -0700 Subject: [PATCH 5/7] Fixing source code --- .../client/BaseHoodieTableServiceClient.java | 169 ++++++++++-------- 1 file changed, 95 insertions(+), 74 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 7f3586f079c46..6051c558e6751 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -88,6 +88,7 @@ import java.time.ZonedDateTime; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -1249,91 +1250,37 @@ public boolean rollback(final String commitInstantTime, Option EQUALS.test(instant.requestedTime(), commitInstantTime)) .findFirst()); + if (commitInstantOpt.isEmpty()) { + log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); + return false; + } + // ---- SCHEDULE PHASE ---- // Determines which rollback plan to use and creates a new one if necessary. - Option rollbackInstantTimeOpt; - Option rollbackPlanOption; - - if (pendingRollbackInfo.isPresent()) { - // Case 1: caller already resolved a pending rollback — re-use it without taking a lock. - rollbackInstantTimeOpt = Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime()); - rollbackPlanOption = Option.of(pendingRollbackInfo.get().getRollbackPlan()); - } else { - // Case 2: no pending rollback supplied — reload the timeline under lock to get the latest view. - if (!skipLocking) { - txnManager.beginStateChange(Option.empty(), Option.empty()); - } - try { - table.getMetaClient().reloadActiveTimeline(); - Option pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); - if (pendingRollbackOpt.isPresent()) { - // Case 2a: a concurrent writer already scheduled the rollback — re-use it. - rollbackInstantTimeOpt = Option.of(pendingRollbackOpt.get().getRollbackInstant().requestedTime()); - rollbackPlanOption = Option.of(pendingRollbackOpt.get().getRollbackPlan()); - } else { - // Case 2b: no pending rollback exists — schedule one now. - // Refresh commitInstantOpt from the reloaded timeline. - commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() - .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) - .findFirst()); - if (commitInstantOpt.isEmpty()) { - log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); - return false; - } - String newRollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false)); - rollbackInstantTimeOpt = Option.of(newRollbackInstantTime); - rollbackPlanOption = table.scheduleRollback(context, newRollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); - } - } finally { - if (!skipLocking) { - txnManager.endStateChange(Option.empty()); - } - } + Option>> scheduleResult = + resolveOrScheduleRollback(table, commitInstantTime, commitInstantOpt, pendingRollbackInfo, suppliedRollbackInstantTime, skipLocking); + if (scheduleResult.isEmpty()) { + return false; } + Option rollbackInstantOpt = Option.of(scheduleResult.get().getLeft()); + Option rollbackPlanOption = scheduleResult.get().getRight(); // ---- EXECUTION PHASE ---- - // For exclusive rollbacks, coordinate with concurrent writers via heartbeat before executing. - boolean emittingHeartbeat = false; - if (config.isEnforceSingleRollbackInstant()) { - if (!skipLocking) { - txnManager.beginStateChange(Option.empty(), Option.empty()); - } - try { - // Validate heartbeat: if another writer is actively executing this rollback, move on. - if (!heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { - return false; - } - // Heartbeat absent or expired — confirm the pending rollback is still on the timeline - // (another writer may have already completed it). - table.getMetaClient().reloadActiveTimeline(); - boolean hasPendingRollback = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime).isPresent(); - boolean commitStillPresent = table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() - .anyMatch(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)); - if (!hasPendingRollback && !commitStillPresent) { - // Rollback was already completed by another writer. - return false; - } - // Take ownership: delete any stale heartbeat file and start emitting. - heartbeatClient.start(rollbackInstantTimeOpt.get()); - emittingHeartbeat = true; - } finally { - if (!skipLocking) { - txnManager.endStateChange(Option.empty()); - } - } - } - - // Execute rollback — no lock held during this operation. + boolean isMultiWriter = config.getWriteConcurrencyMode().supportsMultiWriter(); try { if (rollbackPlanOption.isPresent()) { + acquireRollbackHeartbeatIfMultiWriter(table, rollbackInstantOpt, isMultiWriter); + + // Execute rollback — no lock held during this operation. + // There can be a case where the inflight rollback failed after the instant files // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is // not present in the timeline. In such a case, the hoodie instant instance // is reconstructed to allow the rollback to be reattempted, and the deleteInstants // is set to false since they are already deleted. HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent() - ? table.rollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), true, skipLocking) - : table.rollback(context, rollbackInstantTimeOpt.get(), table.getMetaClient().createNewInstant( + ? table.rollback(context, rollbackInstantOpt.get().requestedTime(), commitInstantOpt.get(), true, skipLocking) + : table.rollback(context, rollbackInstantOpt.get().requestedTime(), table.getMetaClient().createNewInstant( HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime), false, skipLocking); if (timerContext != null) { @@ -1345,8 +1292,8 @@ public boolean rollback(final String commitInstantTime, Option rollbackInstantOpt, boolean isMultiWriter) { + if (!isMultiWriter) { + return; + } + try { + txnManager.beginStateChange(rollbackInstantOpt, txnManager.getLastCompletedTransactionOwner()); + validateHeartBeat(rollbackInstantOpt.get().requestedTime()); + if (!table.getMetaClient().reloadActiveTimeline().filterPendingRollbackTimeline().containsInstant(rollbackInstantOpt.get().requestedTime())) { + throw new HoodieException("Requested rollback instant " + rollbackInstantOpt.get().requestedTime() + + " is not present as pending or already completed in the active timeline."); + } + this.heartbeatClient.start(rollbackInstantOpt.get().requestedTime()); + } finally { + txnManager.endStateChange(rollbackInstantOpt); + } + } + + /** + * Resolves an existing pending rollback or schedules a new one for the given commit instant. + * + * @return Option containing the rollback instant and plan pair, or empty if the commit instant + * is no longer present on the timeline (indicating no rollback is needed). + */ + private Option>> resolveOrScheduleRollback( + HoodieTable table, String commitInstantTime, Option commitInstantOpt, + Option pendingRollbackInfo, Option suppliedRollbackInstantTime, + boolean skipLocking) { + if (pendingRollbackInfo.isPresent()) { + // Case 1: caller already resolved a pending rollback — re-use it without taking a lock. + return Option.of(Pair.of(pendingRollbackInfo.get().getRollbackInstant(), + Option.of(pendingRollbackInfo.get().getRollbackPlan()))); + } + + // Case 2: no pending rollback supplied — reload the timeline under lock to get the latest view. + if (!skipLocking) { + txnManager.beginStateChange(Option.empty(), Option.empty()); + } + try { + if (config.isEnforceSingleRollbackInstant()) { + // if enforcing single rollback instant, we need to check if there is a pending rollback for the instant. + table.getMetaClient().reloadActiveTimeline(); + Option pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); + if (pendingRollbackOpt.isPresent()) { + // Case 2a: a concurrent writer already scheduled the rollback — re-use it. + return Option.of(Pair.of(pendingRollbackOpt.get().getRollbackInstant(), + Option.of(pendingRollbackOpt.get().getRollbackPlan()))); + } + commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() + .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) + .findFirst()); + if (commitInstantOpt.isEmpty()) { + log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); + return Option.empty(); + } + } + // Case 2b: no pending rollback exists — schedule one now. + // Refresh commitInstantOpt from the reloaded timeline. + String newRollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false)); + HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, newRollbackInstantTime, + (Comparator) table.getMetaClient().getTimelineLayout().getInstantComparator()); + Option rollbackPlan = table.scheduleRollback(context, newRollbackInstantTime, commitInstantOpt.get(), + false, config.shouldRollbackUsingMarkers(), false); + return Option.of(Pair.of(rollbackInstant, rollbackPlan)); + } finally { + if (!skipLocking) { + txnManager.endStateChange(Option.empty()); + } + } + } + /** * Main API to rollback failed bootstrap. */ From 4d64313f180c2fb0a576eb30c443aa911e06f8f7 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Tue, 21 Apr 2026 13:55:09 +0530 Subject: [PATCH 6/7] Some fixes --- .../client/BaseHoodieTableServiceClient.java | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 6051c558e6751..a9b72f5e6267c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -88,7 +88,6 @@ import java.time.ZonedDateTime; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -1250,11 +1249,6 @@ public boolean rollback(final String commitInstantTime, Option EQUALS.test(instant.requestedTime(), commitInstantTime)) .findFirst()); - if (commitInstantOpt.isEmpty()) { - log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); - return false; - } - // ---- SCHEDULE PHASE ---- // Determines which rollback plan to use and creates a new one if necessary. Option>> scheduleResult = @@ -1269,7 +1263,9 @@ public boolean rollback(final String commitInstantTime, Option rollbackInstantOpt, boolean isMultiWriter) { - if (!isMultiWriter) { - return; - } + private boolean acquireRollbackHeartbeatIfMultiWriter(HoodieTable table, Option rollbackInstantOpt) throws IOException { try { txnManager.beginStateChange(rollbackInstantOpt, txnManager.getLastCompletedTransactionOwner()); - validateHeartBeat(rollbackInstantOpt.get().requestedTime()); - if (!table.getMetaClient().reloadActiveTimeline().filterPendingRollbackTimeline().containsInstant(rollbackInstantOpt.get().requestedTime())) { - throw new HoodieException("Requested rollback instant " + rollbackInstantOpt.get().requestedTime() - + " is not present as pending or already completed in the active timeline."); + if (!this.heartbeatClient.isHeartbeatExpired(rollbackInstantOpt.get().requestedTime())) { + return false; } + if (table.getMetaClient().reloadActiveTimeline().getRollbackTimeline().filterCompletedInstants().getInstantsAsStream() + .anyMatch(instant -> EQUALS.test(instant.requestedTime(), rollbackInstantOpt.get().requestedTime()))) { + LOG.info("Requested rollback instant " + rollbackInstantOpt.get().requestedTime() + + " is already completed in the active timeline."); + return false; + } + this.heartbeatClient.start(rollbackInstantOpt.get().requestedTime()); + return true; } finally { txnManager.endStateChange(rollbackInstantOpt); } @@ -1356,16 +1357,16 @@ private Option>> resolveOrSchedul commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) .findFirst()); - if (commitInstantOpt.isEmpty()) { - log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); - return Option.empty(); - } + } + if (commitInstantOpt.isEmpty()) { + log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); + return Option.empty(); } // Case 2b: no pending rollback exists — schedule one now. // Refresh commitInstantOpt from the reloaded timeline. String newRollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false)); HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, newRollbackInstantTime, - (Comparator) table.getMetaClient().getTimelineLayout().getInstantComparator()); + table.getMetaClient().getTimelineLayout().getInstantComparator().requestedTimeOrderedComparator()); Option rollbackPlan = table.scheduleRollback(context, newRollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); return Option.of(Pair.of(rollbackInstant, rollbackPlan)); From 31607fca9b3b908043db272b2a238fc763dde29c Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 21 Apr 2026 16:29:04 -0700 Subject: [PATCH 7/7] Enhancing tests --- .../client/BaseHoodieTableServiceClient.java | 34 +-- .../apache/hudi/config/HoodieWriteConfig.java | 12 +- .../hudi/client/TestClientRollback.java | 218 +++++++++++++++++- 3 files changed, 243 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index a9b72f5e6267c..f935dac2757e4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -1261,12 +1261,12 @@ public boolean rollback(final String commitInstantTime, Option EQUALS.test(instant.requestedTime(), rollbackInstantOpt.get().requestedTime()))) { - LOG.info("Requested rollback instant " + rollbackInstantOpt.get().requestedTime() - + " is already completed in the active timeline."); + LOG.info("Requested rollback instant {} is already completed in the active timeline", rollbackInstantOpt.get().requestedTime()); return false; } @@ -1345,8 +1349,8 @@ private Option>> resolveOrSchedul txnManager.beginStateChange(Option.empty(), Option.empty()); } try { - if (config.isEnforceSingleRollbackInstant()) { - // if enforcing single rollback instant, we need to check if there is a pending rollback for the instant. + if (config.shouldAvoidDuplicateRollbackPlan()) { + // Check if another writer already scheduled a rollback for this instant to avoid duplicates. table.getMetaClient().reloadActiveTimeline(); Option pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); if (pendingRollbackOpt.isPresent()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index a7678cd00a928..50cb9857b808c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -299,11 +299,13 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated " + "during the writes. Turned on by default."); - public static final ConfigProperty ROLLBACK_ENFORCE_SINGLE_INSTANT = ConfigProperty - .key("hoodie.rollback.enforce.single.rollback.instant") + public static final ConfigProperty ROLLBACK_AVOID_DUPLICATE_PLAN = ConfigProperty + .key("hoodie.rollback.avoid.duplicate.plan") .defaultValue("false") .markAdvanced() - .withDocumentation("Enables exclusive rollback so that rollback plan is generated and executed by only one writer at a time"); + .withDocumentation("When enabled in multi-writer mode, before scheduling a new rollback plan, the writer reloads " + + "the timeline under lock to check if another writer already scheduled one for the same failed commit. " + + "This avoids duplicate rollback instants and uses heartbeats to ensure only one writer executes the rollback at a time."); public static final ConfigProperty FAIL_JOB_ON_DUPLICATE_DATA_FILE_DETECTION = ConfigProperty .key("hoodie.fail.job.on.duplicate.data.file.detection") @@ -1584,8 +1586,8 @@ public boolean shouldRollbackUsingMarkers() { return getBoolean(ROLLBACK_USING_MARKERS_ENABLE); } - public boolean isEnforceSingleRollbackInstant() { - return getBoolean(ROLLBACK_ENFORCE_SINGLE_INSTANT) && getWriteConcurrencyMode().supportsMultiWriter(); + public boolean shouldAvoidDuplicateRollbackPlan() { + return getBoolean(ROLLBACK_AVOID_DUPLICATE_PLAN) && getWriteConcurrencyMode().supportsMultiWriter(); } public boolean enableComplexKeygenValidation() { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 8b0be8d7b4a15..7c39e75f01097 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRestorePlan; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; @@ -37,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.FileCreateUtilsLegacy; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -68,6 +70,10 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1004,9 +1010,219 @@ public void testExclusiveRollbackWhenCommitNotInTimeline() throws Exception { } } + /** + * Test: config enabled, no pre-existing pending rollback, inflight commit exists. + * This is the "first writer to arrive" scenario — it schedules a fresh rollback plan under lock + * and then executes it (Case 2b in resolveOrScheduleRollback). + */ + @Test + public void testAvoidDuplicateRollbackFirstWriterSchedulesNewPlan() throws Exception { + final String p1 = "2016/05/01"; + final String p2 = "2016/05/02"; + final String commitTime1 = "20160501010101"; + final String commitTime2 = "20160502020601"; + final String commitTime3 = "20160506030611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + } + }; + Map partitionAndFileId3 = new HashMap() { + { + put(p1, "id31"); + put(p2, "id32"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1, p2) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1).getLeft() + .addCommit(commitTime2).withBaseFilesInPartitions(partitionAndFileId2).getLeft() + .addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + + // No pending rollback file exists — the writer must schedule one itself. + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + boolean result = client.rollback(commitTime3); + assertTrue(result, "Rollback should succeed when first writer schedules a new plan"); + + // Verify the inflight commit and its data files are cleaned up + assertFalse(testTable.inflightCommitExists(commitTime3)); + assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); + + // Verify earlier commits are unaffected + assertTrue(testTable.baseFilesExist(partitionAndFileId1, commitTime1)); + assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2)); + + // Verify exactly one rollback instant was created and completed + metaClient.reloadActiveTimeline(); + List rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants(); + assertEquals(1, rollbackInstants.size()); + assertTrue(rollbackInstants.get(0).isCompleted()); + } + } + + /** + * Test: another writer has already fully completed the rollback — the inflight commit is removed + * from the timeline and a completed rollback instant exists. With avoid-duplicate-plan enabled, + * resolveOrScheduleRollback reloads the timeline, finds the commit absent, and returns empty. + */ + @Test + public void testAvoidDuplicateRollbackAlreadyCompletedByAnotherWriter() throws Exception { + final String p1 = "2016/05/01"; + final String p2 = "2016/05/02"; + final String commitTime1 = "20160501010101"; + final String commitTime2 = "20160502020601"; + final String commitTime3 = "20160506030611"; + final String rollbackInstantTime = "20160506040611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1, p2) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1).getLeft() + .addCommit(commitTime2).withBaseFilesInPartitions(partitionAndFileId2); + + // Simulate that another writer already completed the rollback of commitTime3: + // - The inflight commit for commitTime3 no longer exists on the timeline + // - A completed rollback instant exists for it + HoodieRollbackMetadata rollbackMetadata = new HoodieRollbackMetadata(); + rollbackMetadata.setCommitsRollback(Collections.singletonList(commitTime3)); + rollbackMetadata.setStartRollbackTime(rollbackInstantTime); + rollbackMetadata.setPartitionMetadata(new HashMap<>()); + rollbackMetadata.setInstantsRollback(Collections.singletonList( + new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION))); + FileCreateUtils.createRequestedRollbackFile(metaClient, rollbackInstantTime); + FileCreateUtils.createInflightRollbackFile(metaClient, rollbackInstantTime); + FileCreateUtils.createRollbackFile(metaClient, rollbackInstantTime, rollbackMetadata, false); + + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + boolean result = client.rollback(commitTime3); + // commitTime3 is no longer on the timeline — the writer should detect this and skip + assertFalse(result, "Rollback should return false when already completed by another writer"); + + // Verify no additional rollback instants were created — only the pre-existing one + metaClient.reloadActiveTimeline(); + List completedRollbacks = metaClient.getActiveTimeline() + .getRollbackTimeline().filterCompletedInstants().getInstants(); + assertEquals(1, completedRollbacks.size()); + assertEquals(rollbackInstantTime, completedRollbacks.get(0).requestedTime()); + + // Verify earlier commits are unaffected + assertTrue(testTable.baseFilesExist(partitionAndFileId1, commitTime1)); + assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2)); + } + } + + /** + * Test: two writers concurrently attempt to rollback the same inflight commit. + * With avoid-duplicate-plan enabled, exactly one rollback should succeed. The other writer + * should either reuse the pending plan and skip (due to active heartbeat) or find the + * rollback already completed. + */ + @Test + public void testConcurrentWritersRollbackSameInflightCommit() throws Exception { + final String p1 = "2016/05/01"; + final String p2 = "2016/05/02"; + final String commitTime1 = "20160501010101"; + final String commitTime2 = "20160502020601"; + final String commitTime3 = "20160506030611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + } + }; + Map partitionAndFileId3 = new HashMap() { + { + put(p1, "id31"); + put(p2, "id32"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1, p2) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1).getLeft() + .addCommit(commitTime2).withBaseFilesInPartitions(partitionAndFileId2).getLeft() + .addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + + // No pending rollback — both writers will race to schedule/execute one. + ExecutorService executor = Executors.newFixedThreadPool(2); + CountDownLatch startLatch = new CountDownLatch(1); + + try { + Future writer1Future = executor.submit(() -> { + startLatch.await(); + try (SparkRDDWriteClient client1 = getHoodieWriteClient(config)) { + return client1.rollback(commitTime3); + } + }); + + Future writer2Future = executor.submit(() -> { + startLatch.await(); + try (SparkRDDWriteClient client2 = getHoodieWriteClient(config)) { + return client2.rollback(commitTime3); + } + }); + + // Release both writers simultaneously + startLatch.countDown(); + + boolean result1 = writer1Future.get(); + boolean result2 = writer2Future.get(); + + // At least one writer must succeed; both must not fail with an exception + assertTrue(result1 || result2, "At least one writer should successfully execute the rollback"); + + // Verify the inflight commit is rolled back + assertFalse(testTable.inflightCommitExists(commitTime3)); + assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); + + // Verify earlier commits are unaffected + assertTrue(testTable.baseFilesExist(partitionAndFileId1, commitTime1)); + assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2)); + + // Verify there is exactly one completed rollback (no duplicates) + metaClient.reloadActiveTimeline(); + List completedRollbacks = metaClient.getActiveTimeline() + .getRollbackTimeline().filterCompletedInstants().getInstants(); + assertEquals(1, completedRollbacks.size(), "Exactly one completed rollback should exist, not duplicates"); + } finally { + executor.shutdownNow(); + } + } + private HoodieWriteConfig buildExclusiveRollbackMultiWriterConfig() { Properties props = new Properties(); - props.setProperty(HoodieWriteConfig.ROLLBACK_ENFORCE_SINGLE_INSTANT.key(), "true"); + props.setProperty(HoodieWriteConfig.ROLLBACK_AVOID_DUPLICATE_PLAN.key(), "true"); return HoodieWriteConfig.newBuilder() .withPath(basePath) .withRollbackUsingMarkers(false)