Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hudi-examples/hudi-examples-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@
<scope>compile</scope>
</dependency>

<!-- Kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</dependency>

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
Expand Down Expand Up @@ -181,8 +182,10 @@ public void start() throws Exception {
// setup classloader for APIs that use reflection without taking ClassLoader param
// reference: https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// initialize event buffer
reset();
// initialize event buffer only if not restored from checkpoint
if (this.eventBuffer == null) {
reset();
}
this.gateways = new SubtaskGateway[this.parallelism];
// init table, create if not exists.
this.metaClient = initTableIfNotExists(this.conf);
Expand All @@ -203,6 +206,7 @@ public void start() throws Exception {
if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
initClientIds(conf);
}
restoreEvents();
}

@Override
Expand Down Expand Up @@ -233,7 +237,8 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r
executor.execute(
() -> {
try {
result.complete(new byte[0]);
byte[] eventBytes = SerializationUtils.serialize(this.eventBuffer);
result.complete(eventBytes);
} catch (Throwable throwable) {
// when a checkpoint fails, throws directly.
result.completeExceptionally(
Expand Down Expand Up @@ -271,7 +276,9 @@ public void notifyCheckpointComplete(long checkpointId) {

@Override
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
// no operation
if (checkpointData != null && checkpointData.length > 0) {
this.eventBuffer = SerializationUtils.deserialize(checkpointData);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If the coordinator's parallelism is rescaled between checkpoints (Flink supports this for operator coordinators), the restored eventBuffer will be sized to the OLD parallelism. After restore, this.parallelism reflects the NEW value, but the buffer length doesn't — could handleWriteMetaEvent then hit ArrayIndexOutOfBoundsException when event.getTaskID() >= eventBuffer.length, or silently drop slots if shrinking? Might be worth resizing the buffer here based on the current parallelism.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

}
}

@Override
Expand Down Expand Up @@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
// Utilities
// -------------------------------------------------------------------------

private void restoreEvents() {
if (this.eventBuffer == null || Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) {
return;
}
String restoreInstant = Arrays.stream(this.eventBuffer)
.filter(Objects::nonNull)
.filter(e -> e.getWriteStatuses().size() > 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: .size() > 0 could be flipped to !e.getWriteStatuses().isEmpty() — a bit more idiomatic Java and avoids the unnecessary size computation.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

.findFirst()
.map(WriteMetadataEvent::getInstantTime)
.orElse(null);
if (restoreInstant == null) {
return;
}
HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants();
if (!completedTimeline.containsInstant(restoreInstant)) {
LOG.info("Recommit instant {} from restored coordinator state", restoreInstant);
if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
writeClient.getHeartbeatClient().start(restoreInstant);
}
commitInstant(restoreInstant);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 restoreEvents() runs synchronously inside start() and may perform a full Hudi commit (writeClient.commit on potentially large WriteStatus lists). If this takes a long time, could it exceed Flink's coordinator start timeout? The pre-existing recommit on bootstrap path runs through the executor — would it be safer to dispatch this through executor.execute(...) as well, or is the synchronous behavior intentional here?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 When containsInstant(restoreInstant) returns true (instant was already committed before the crash), this path leaves the stale events in eventBuffer rather than calling reset(). As bootstrap events arrive after restart they will overwrite slots one by one, but until then any intervening checkpoint or commit attempt would re-serialize/re-process stale data. Would it be safer to reset() here in the already-committed branch?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

this.metaClient.reloadActiveTimeline();
}

private void initHiveSync() {
this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,35 @@ public void testConsistentBucketIndex() throws Exception {
.end();
}

@Test
public void testRecommitAfterCoordinatorRestart() throws Exception {
Map<String, String> expected = new HashMap<>();
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
preparePipeline(conf)
.consume(TestData.DATA_SET_PART1)
.checkpoint(1)
.assertNextEvent(1, "par1")
// checkpoint 2 captures coordinator state WITH par1 event;
// the write function has no new data so flushes an empty batch
.checkpoint(2)
.assertNextEvent()
// simulate failure: no checkpointComplete was called, data is NOT committed.
// restart coordinator: restores par1 event from ckp-2 state and recommits it.
.restartCoordinator()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 .restartCoordinator() is invoked on the TestHarness returned by preparePipeline(conf), but TestHarness in TestWriteBase.java only defines coordinatorFails() — there's no restartCoordinator() wrapper. This test should fail to compile as is. Could you add a restartCoordinator() method to TestHarness that delegates to this.pipeline.restartCoordinator()?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

// subtask re-initializes and sends bootstrap event
.subTaskFails(0, 0)
// bootstrap event triggers initInstant -> startInstant (new instant)
.assertNextEvent()
// write new data under the new instant
.consume(TestData.DATA_SET_PART4)
.checkpoint(3)
.assertNextEvent(1, "par2")
.checkpointComplete(3)
.checkWrittenData(expected, 2)
.end();
}

@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -73,6 +74,7 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
private final MockOperatorCoordinatorContext coordinatorContext;
private final StreamWriteOperatorCoordinator coordinator;
private final MockStateInitializationContext stateInitializationContext;
private final TreeMap<Long, byte[]> coordinatorStateStore;

/**
* Function that converts row data to HoodieRecord.
Expand Down Expand Up @@ -123,6 +125,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
this.coordinatorStateStore = new TreeMap<>();
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
this.streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
Expand All @@ -134,6 +137,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
}

public void openFunction() throws Exception {
resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
toHoodieFunction = new RowDataToHoodieFunction<>(rowType, conf);
Expand Down Expand Up @@ -189,7 +193,7 @@ public Map<String, List<HoodieRecord>> getDataBuffer() {

public void checkpointFunction(long checkpointId) throws Exception {
// checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
checkpointCoordinator(checkpointId);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapOperator.snapshotState(null);
}
Expand All @@ -199,6 +203,21 @@ public void checkpointFunction(long checkpointId) throws Exception {
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
}

private void checkpointCoordinator(long checkpointId) throws Exception {
CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
// checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, completableFuture);
this.coordinatorStateStore.put(checkpointId, completableFuture.get());
}

private void resetCoordinatorToCheckpoint() {
if (coordinatorStateStore.isEmpty()) {
return;
}
Map.Entry<Long, byte[]> latestState = this.coordinatorStateStore.lastEntry();
this.coordinator.resetToCheckpoint(latestState.getKey(), latestState.getValue());
}

public void endInput() {
writeFunction.endInput();
}
Expand All @@ -223,6 +242,15 @@ public void jobFailover() throws Exception {

public void coordinatorFails() throws Exception {
this.coordinator.close();
resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
}

public void restartCoordinator() throws Exception {
this.coordinator.close();
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The coordinator field is declared private final StreamWriteOperatorCoordinator coordinator; (line 75), so reassigning it here should fail to compile. Did you intend to drop the final modifier on the field declaration as part of this change?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,22 @@ public class TestData {
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));

public static List<RowData> DATA_SET_PART1 = Collections.singletonList(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: DATA_SET_PART1 looks identical to the existing DATA_SET_SINGLE_INSERT (id1/Danny/23/ts=1/par1). Could you either reuse DATA_SET_SINGLE_INSERT in the test, or add a brief comment here explaining why a separate constant is needed? As-is, a future reader will wonder if the duplication is intentional.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));

public static List<RowData> DATA_SET_PART2 = Collections.singletonList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par2")));

public static List<RowData> DATA_SET_PART3 = Collections.singletonList(
insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,
TimestampData.fromEpochMillis(3), StringData.fromString("par2")));

public static List<RowData> DATA_SET_PART4 = Collections.singletonList(
insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31,
TimestampData.fromEpochMillis(4), StringData.fromString("par2")));

public static List<RowData> DATA_SET_SINGLE_DELETE = Collections.singletonList(
deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(5), StringData.fromString("par1")));
Expand Down
Loading