Flink dataloss fixes#18737
Conversation
The original OSS test was designed for the multi-checkpoint EventBuffers model. Adapt it for this fork's single eventBuffer model: - Remove intermediate DATA_SET_PART3 batch that would be lost on restart - Add checkpoint(2) after assertNextEvent to capture coordinator state WITH the event buffer (coordinator checkpoint saves state before write fn flush, so ckp-2 captures events from ckp-1's flush) - Write new data (DATA_SET_PART4) after restart under a new instant - Verify both recommitted data (par1) and new data (par2) Made-with: Cursor
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR fixes a real data-loss scenario by persisting the coordinator's eventBuffer to checkpoint state and recommitting any pending instant on coordinator restart. The core logic in restoreEvents() looks reasonable. However, the test-side changes appear to have compile errors that would prevent the PR from building, and there are a couple of edge cases worth considering. Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of minor nits: one idiomatic Java cleanup and one potential dataset duplication worth clarifying.
|
|
||
| public void restartCoordinator() throws Exception { | ||
| this.coordinator.close(); | ||
| this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); |
There was a problem hiding this comment.
🤖 The coordinator field is declared private final StreamWriteOperatorCoordinator coordinator; (line 75), so reassigning it here should fail to compile. Did you intend to drop the final modifier on the field declaration as part of this change?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| .assertNextEvent() | ||
| // simulate failure: no checkpointComplete was called, data is NOT committed. | ||
| // restart coordinator: restores par1 event from ckp-2 state and recommits it. | ||
| .restartCoordinator() |
There was a problem hiding this comment.
🤖 .restartCoordinator() is invoked on the TestHarness returned by preparePipeline(conf), but TestHarness in TestWriteBase.java only defines coordinatorFails() — there's no restartCoordinator() wrapper. This test should fail to compile as is. Could you add a restartCoordinator() method to TestHarness that delegates to this.pipeline.restartCoordinator()?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { | ||
| // no operation | ||
| if (checkpointData != null && checkpointData.length > 0) { | ||
| this.eventBuffer = SerializationUtils.deserialize(checkpointData); |
There was a problem hiding this comment.
🤖 If the coordinator's parallelism is rescaled between checkpoints (Flink supports this for operator coordinators), the restored eventBuffer will be sized to the OLD parallelism. After restore, this.parallelism reflects the NEW value, but the buffer length doesn't — could handleWriteMetaEvent then hit ArrayIndexOutOfBoundsException when event.getTaskID() >= eventBuffer.length, or silently drop slots if shrinking? Might be worth resizing the buffer here based on the current parallelism.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| writeClient.getHeartbeatClient().start(restoreInstant); | ||
| } | ||
| commitInstant(restoreInstant); | ||
| } |
There was a problem hiding this comment.
🤖 When containsInstant(restoreInstant) returns true (instant was already committed before the crash), this path leaves the stale events in eventBuffer rather than calling reset(). As bootstrap events arrive after restart they will overwrite slots one by one, but until then any intervening checkpoint or commit attempt would re-serialize/re-process stale data. Would it be safer to reset() here in the already-committed branch?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) { | ||
| writeClient.getHeartbeatClient().start(restoreInstant); | ||
| } | ||
| commitInstant(restoreInstant); |
There was a problem hiding this comment.
🤖 restoreEvents() runs synchronously inside start() and may perform a full Hudi commit (writeClient.commit on potentially large WriteStatus lists). If this takes a long time, could it exceed Flink's coordinator start timeout? The pre-existing recommit on bootstrap path runs through the executor — would it be safer to dispatch this through executor.execute(...) as well, or is the synchronous behavior intentional here?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } | ||
| String restoreInstant = Arrays.stream(this.eventBuffer) | ||
| .filter(Objects::nonNull) | ||
| .filter(e -> e.getWriteStatuses().size() > 0) |
There was a problem hiding this comment.
🤖 nit: .size() > 0 could be flipped to !e.getWriteStatuses().isEmpty() — a bit more idiomatic Java and avoids the unnecessary size computation.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, | ||
| TimestampData.fromEpochMillis(1), StringData.fromString("par1"))); | ||
|
|
||
| public static List<RowData> DATA_SET_PART1 = Collections.singletonList( |
There was a problem hiding this comment.
🤖 nit: DATA_SET_PART1 looks identical to the existing DATA_SET_SINGLE_INSERT (id1/Danny/23/ts=1/par1). Could you either reuse DATA_SET_SINGLE_INSERT in the test, or add a brief comment here explaining why a separate constant is needed? As-is, a future reader will wonder if the duplication is intentional.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
Summary and Changelog
Impact
Risk Level
Documentation Update
Contributor's checklist