From 78b6745c623f8686cef945a85dcf7675fdc31852 Mon Sep 17 00:00:00 2001 From: Shuo Cheng Date: Thu, 17 Jul 2025 09:09:08 +0800 Subject: [PATCH 1/2] [HUDI-9570] Using coordinator state to persist and recommit write metadata events (#13543) --- hudi-examples/hudi-examples-flink/pom.xml | 6 +++ .../sink/StreamWriteOperatorCoordinator.java | 39 +++++++++++++++++-- .../hudi/sink/TestWriteMergeOnRead.java | 24 ++++++++++++ .../utils/StreamWriteFunctionWrapper.java | 30 +++++++++++++- .../java/org/apache/hudi/utils/TestData.java | 15 +++++++ 5 files changed, 109 insertions(+), 5 deletions(-) diff --git a/hudi-examples/hudi-examples-flink/pom.xml b/hudi-examples/hudi-examples-flink/pom.xml index e665402017718..522c5b27c9dad 100644 --- a/hudi-examples/hudi-examples-flink/pom.xml +++ b/hudi-examples/hudi-examples-flink/pom.xml @@ -115,6 +115,12 @@ compile + + + com.esotericsoftware + kryo-shaded + + org.apache.flink diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 274091c88ea3c..1c7aa47809337 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SerializationUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; @@ -181,8 +182,10 @@ public void start() throws Exception { // setup classloader for APIs that use reflection without taking ClassLoader param // reference: https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - // initialize event buffer - reset(); + // initialize event buffer only if not restored from checkpoint + if (this.eventBuffer == null) { + reset(); + } this.gateways = new SubtaskGateway[this.parallelism]; // init table, create if not exists. this.metaClient = initTableIfNotExists(this.conf); @@ -203,6 +206,7 @@ public void start() throws Exception { if (OptionsResolver.isOptimisticConcurrencyControl(conf)) { initClientIds(conf); } + restoreEvents(); } @Override @@ -233,7 +237,8 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r executor.execute( () -> { try { - result.complete(new byte[0]); + byte[] eventBytes = SerializationUtils.serialize(this.eventBuffer); + result.complete(eventBytes); } catch (Throwable throwable) { // when a checkpoint fails, throws directly. result.completeExceptionally( @@ -271,7 +276,9 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { - // no operation + if (checkpointData != null && checkpointData.length > 0) { + this.eventBuffer = SerializationUtils.deserialize(checkpointData); + } } @Override @@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) { // Utilities // ------------------------------------------------------------------------- + private void restoreEvents() { + if (this.eventBuffer == null || Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) { + return; + } + String restoreInstant = Arrays.stream(this.eventBuffer) + .filter(Objects::nonNull) + .filter(e -> e.getWriteStatuses().size() > 0) + .findFirst() + .map(WriteMetadataEvent::getInstantTime) + .orElse(null); + if (restoreInstant == null) { + return; + } + HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants(); + if (!completedTimeline.containsInstant(restoreInstant)) { + LOG.info("Recommit instant {} from restored coordinator state", restoreInstant); + if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) { + writeClient.getHeartbeatClient().start(restoreInstant); + } + commitInstant(restoreInstant); + } + this.metaClient.reloadActiveTimeline(); + } + private void initHiveSync() { this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 94214661ea878..622fb0a8d2dfb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -201,6 +201,30 @@ public void testConsistentBucketIndex() throws Exception { .end(); } + @Test + public void testRecommitAfterCoordinatorRestart() throws Exception { + Map expected = new HashMap<>(); + expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); + expected.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); + preparePipeline(conf) + .consume(TestData.DATA_SET_PART1) + .emptyEventBuffer() + .checkpoint(1) + .assertNextEvent(1, "par1") + .consume(TestData.DATA_SET_PART3) + .checkpoint(2) + .assertNextEvent(1, "par2") + .restartCoordinator() + .subTaskFails(0, 0) + .assertNextEvent() + .consume(TestData.DATA_SET_PART4) + .checkpoint(3) + .assertNextEvent(1, "par2") + .checkpointComplete(3) + .checkWrittenData(expected, 2) + .end(); + } + @Override protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED1; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index cf801bb0d7d0a..89cc0c3773fcb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; /** @@ -73,6 +74,7 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper { private final MockOperatorCoordinatorContext coordinatorContext; private final StreamWriteOperatorCoordinator coordinator; private final MockStateInitializationContext stateInitializationContext; + private final TreeMap coordinatorStateStore; /** * Function that converts row data to HoodieRecord. @@ -123,6 +125,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext(); this.stateInitializationContext = new MockStateInitializationContext(); + this.coordinatorStateStore = new TreeMap<>(); this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf); this.streamConfig = new StreamConfig(conf); streamConfig.setOperatorID(new OperatorID()); @@ -134,6 +137,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E } public void openFunction() throws Exception { + resetCoordinatorToCheckpoint(); this.coordinator.start(); this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); toHoodieFunction = new RowDataToHoodieFunction<>(rowType, conf); @@ -189,7 +193,7 @@ public Map> getDataBuffer() { public void checkpointFunction(long checkpointId) throws Exception { // checkpoint the coordinator first - this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); + checkpointCoordinator(checkpointId); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { bootstrapOperator.snapshotState(null); } @@ -199,6 +203,21 @@ public void checkpointFunction(long checkpointId) throws Exception { stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); } + private void checkpointCoordinator(long checkpointId) throws Exception { + CompletableFuture completableFuture = new CompletableFuture<>(); + // checkpoint the coordinator first + this.coordinator.checkpointCoordinator(checkpointId, completableFuture); + this.coordinatorStateStore.put(checkpointId, completableFuture.get()); + } + + private void resetCoordinatorToCheckpoint() { + if (coordinatorStateStore.isEmpty()) { + return; + } + Map.Entry latestState = this.coordinatorStateStore.lastEntry(); + this.coordinator.resetToCheckpoint(latestState.getKey(), latestState.getValue()); + } + public void endInput() { writeFunction.endInput(); } @@ -223,6 +242,15 @@ public void jobFailover() throws Exception { public void coordinatorFails() throws Exception { this.coordinator.close(); + resetCoordinatorToCheckpoint(); + this.coordinator.start(); + this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); + } + + public void restartCoordinator() throws Exception { + this.coordinator.close(); + this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); + resetCoordinatorToCheckpoint(); this.coordinator.start(); this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 65c8e82ada166..2cfa6160baa1d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -345,6 +345,21 @@ public class TestData { insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1"))); + public static List DATA_SET_PART1 = Collections.singletonList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1"))); + + public static List DATA_SET_PART2 = Collections.singletonList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par2"))); + + public static List DATA_SET_PART3 = Collections.singletonList( + insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + TimestampData.fromEpochMillis(3), StringData.fromString("par2"))); + + public static List DATA_SET_PART4 = Collections.singletonList( + insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, + TimestampData.fromEpochMillis(4), StringData.fromString("par2"))); public static List DATA_SET_SINGLE_DELETE = Collections.singletonList( deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(5), StringData.fromString("par1"))); From 40ca3352af43b3d0eb834d95610e975d3ef074c5 Mon Sep 17 00:00:00 2001 From: Vamshi Pasunuru Date: Mon, 27 Apr 2026 10:51:48 -0700 Subject: [PATCH 2/2] Fix testRecommitAfterCoordinatorRestart for single-buffer event model The original OSS test was designed for the multi-checkpoint EventBuffers model. Adapt it for this fork's single eventBuffer model: - Remove intermediate DATA_SET_PART3 batch that would be lost on restart - Add checkpoint(2) after assertNextEvent to capture coordinator state WITH the event buffer (coordinator checkpoint saves state before write fn flush, so ckp-2 captures events from ckp-1's flush) - Write new data (DATA_SET_PART4) after restart under a new instant - Verify both recommitted data (par1) and new data (par2) Made-with: Cursor --- .../org/apache/hudi/sink/TestWriteMergeOnRead.java | 13 +++++++++---- .../test/java/org/apache/hudi/utils/TestData.java | 1 + 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 622fb0a8d2dfb..e0863c76669b1 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -205,18 +205,23 @@ public void testConsistentBucketIndex() throws Exception { public void testRecommitAfterCoordinatorRestart() throws Exception { Map expected = new HashMap<>(); expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); - expected.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); + expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]"); preparePipeline(conf) .consume(TestData.DATA_SET_PART1) - .emptyEventBuffer() .checkpoint(1) .assertNextEvent(1, "par1") - .consume(TestData.DATA_SET_PART3) + // checkpoint 2 captures coordinator state WITH par1 event; + // the write function has no new data so flushes an empty batch .checkpoint(2) - .assertNextEvent(1, "par2") + .assertNextEvent() + // simulate failure: no checkpointComplete was called, data is NOT committed. + // restart coordinator: restores par1 event from ckp-2 state and recommits it. .restartCoordinator() + // subtask re-initializes and sends bootstrap event .subTaskFails(0, 0) + // bootstrap event triggers initInstant -> startInstant (new instant) .assertNextEvent() + // write new data under the new instant .consume(TestData.DATA_SET_PART4) .checkpoint(3) .assertNextEvent(1, "par2") diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 2cfa6160baa1d..f490ae9599ca6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -360,6 +360,7 @@ public class TestData { public static List DATA_SET_PART4 = Collections.singletonList( insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, TimestampData.fromEpochMillis(4), StringData.fromString("par2"))); + public static List DATA_SET_SINGLE_DELETE = Collections.singletonList( deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(5), StringData.fromString("par1")));