diff --git a/hudi-examples/hudi-examples-flink/pom.xml b/hudi-examples/hudi-examples-flink/pom.xml
index e665402017718..522c5b27c9dad 100644
--- a/hudi-examples/hudi-examples-flink/pom.xml
+++ b/hudi-examples/hudi-examples-flink/pom.xml
@@ -115,6 +115,12 @@
compile
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
org.apache.flink
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 274091c88ea3c..1c7aa47809337 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -28,6 +28,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -181,8 +182,10 @@ public void start() throws Exception {
// setup classloader for APIs that use reflection without taking ClassLoader param
// reference: https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- // initialize event buffer
- reset();
+ // initialize event buffer only if not restored from checkpoint
+ if (this.eventBuffer == null) {
+ reset();
+ }
this.gateways = new SubtaskGateway[this.parallelism];
// init table, create if not exists.
this.metaClient = initTableIfNotExists(this.conf);
@@ -203,6 +206,7 @@ public void start() throws Exception {
if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
initClientIds(conf);
}
+ restoreEvents();
}
@Override
@@ -233,7 +237,8 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r
executor.execute(
() -> {
try {
- result.complete(new byte[0]);
+ byte[] eventBytes = SerializationUtils.serialize(this.eventBuffer);
+ result.complete(eventBytes);
} catch (Throwable throwable) {
// when a checkpoint fails, throws directly.
result.completeExceptionally(
@@ -271,7 +276,9 @@ public void notifyCheckpointComplete(long checkpointId) {
@Override
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
- // no operation
+ if (checkpointData != null && checkpointData.length > 0) {
+ this.eventBuffer = SerializationUtils.deserialize(checkpointData);
+ }
}
@Override
@@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
// Utilities
// -------------------------------------------------------------------------
+ private void restoreEvents() {
+ if (this.eventBuffer == null || Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) {
+ return;
+ }
+ String restoreInstant = Arrays.stream(this.eventBuffer)
+ .filter(Objects::nonNull)
+ .filter(e -> e.getWriteStatuses().size() > 0)
+ .findFirst()
+ .map(WriteMetadataEvent::getInstantTime)
+ .orElse(null);
+ if (restoreInstant == null) {
+ return;
+ }
+ HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants();
+ if (!completedTimeline.containsInstant(restoreInstant)) {
+ LOG.info("Recommit instant {} from restored coordinator state", restoreInstant);
+ if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+ writeClient.getHeartbeatClient().start(restoreInstant);
+ }
+ commitInstant(restoreInstant);
+ }
+ this.metaClient.reloadActiveTimeline();
+ }
+
private void initHiveSync() {
this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 94214661ea878..e0863c76669b1 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -201,6 +201,35 @@ public void testConsistentBucketIndex() throws Exception {
.end();
}
+ @Test
+ public void testRecommitAfterCoordinatorRestart() throws Exception {
+ Map expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+ expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_PART1)
+ .checkpoint(1)
+ .assertNextEvent(1, "par1")
+ // checkpoint 2 captures coordinator state WITH par1 event;
+ // the write function has no new data so flushes an empty batch
+ .checkpoint(2)
+ .assertNextEvent()
+ // simulate failure: no checkpointComplete was called, data is NOT committed.
+ // restart coordinator: restores par1 event from ckp-2 state and recommits it.
+ .restartCoordinator()
+ // subtask re-initializes and sends bootstrap event
+ .subTaskFails(0, 0)
+ // bootstrap event triggers initInstant -> startInstant (new instant)
+ .assertNextEvent()
+ // write new data under the new instant
+ .consume(TestData.DATA_SET_PART4)
+ .checkpoint(3)
+ .assertNextEvent(1, "par2")
+ .checkpointComplete(3)
+ .checkWrittenData(expected, 2)
+ .end();
+ }
+
@Override
protected Map getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index cf801bb0d7d0a..89cc0c3773fcb 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -56,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
/**
@@ -73,6 +74,7 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper {
private final MockOperatorCoordinatorContext coordinatorContext;
private final StreamWriteOperatorCoordinator coordinator;
private final MockStateInitializationContext stateInitializationContext;
+ private final TreeMap coordinatorStateStore;
/**
* Function that converts row data to HoodieRecord.
@@ -123,6 +125,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
+ this.coordinatorStateStore = new TreeMap<>();
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
this.streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
@@ -134,6 +137,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
}
public void openFunction() throws Exception {
+ resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
toHoodieFunction = new RowDataToHoodieFunction<>(rowType, conf);
@@ -189,7 +193,7 @@ public Map> getDataBuffer() {
public void checkpointFunction(long checkpointId) throws Exception {
// checkpoint the coordinator first
- this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
+ checkpointCoordinator(checkpointId);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapOperator.snapshotState(null);
}
@@ -199,6 +203,21 @@ public void checkpointFunction(long checkpointId) throws Exception {
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
}
+ private void checkpointCoordinator(long checkpointId) throws Exception {
+ CompletableFuture completableFuture = new CompletableFuture<>();
+ // checkpoint the coordinator first
+ this.coordinator.checkpointCoordinator(checkpointId, completableFuture);
+ this.coordinatorStateStore.put(checkpointId, completableFuture.get());
+ }
+
+ private void resetCoordinatorToCheckpoint() {
+ if (coordinatorStateStore.isEmpty()) {
+ return;
+ }
+ Map.Entry latestState = this.coordinatorStateStore.lastEntry();
+ this.coordinator.resetToCheckpoint(latestState.getKey(), latestState.getValue());
+ }
+
public void endInput() {
writeFunction.endInput();
}
@@ -223,6 +242,15 @@ public void jobFailover() throws Exception {
public void coordinatorFails() throws Exception {
this.coordinator.close();
+ resetCoordinatorToCheckpoint();
+ this.coordinator.start();
+ this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+ }
+
+ public void restartCoordinator() throws Exception {
+ this.coordinator.close();
+ this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+ resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 65c8e82ada166..f490ae9599ca6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -345,6 +345,22 @@ public class TestData {
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ public static List DATA_SET_PART1 = Collections.singletonList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+
+ public static List DATA_SET_PART2 = Collections.singletonList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par2")));
+
+ public static List DATA_SET_PART3 = Collections.singletonList(
+ insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par2")));
+
+ public static List DATA_SET_PART4 = Collections.singletonList(
+ insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par2")));
+
public static List DATA_SET_SINGLE_DELETE = Collections.singletonList(
deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(5), StringData.fromString("par1")));