feat(flink): Implement continuous sorting feature for append write#18083
Conversation
… sorted order incrementally and avoid single-partition lag during ingestion by reducing large pause time from sort and backpressure Summary: - Added AppendWriteFunctionWithContinuousSort which keeps records in a TreeMap keyed by a code-generated normalized key and an insertion sequence, drains oldest entries when a configurable threshold is reached, and writes drained records immediately; snapshot/endInput drain remaining records. - Updated AppendWriteFunctions.create to instantiate the continuous sorter when WRITE_BUFFER_SORT_CONTINUOUS_ENABLED is true. - Introduced three new FlinkOptions: WRITE_BUFFER_SORT_CONTINUOUS_ENABLED, WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_THRESHOLD_PERCENT, and WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, and added runtime validation (buffer > 0, 0 < threshold < 100, drainSize > 0, parsed non-empty sort keys). - Added ITTestAppendWriteFunctionWithContinuousSort integration tests covering buffer flush triggers, sorted output correctness (with and without continuous drain), drain threshold/size behaviors, and invalid-parameter error cases. Verified that we can push higher CPU utilization and input rate with continuous sorting enabled (See before and after 11:30AM)
|
@hudi-bot run azure |
|
@prashantwason thks for the contribution. A high-level question before reviewing the details, I noticed this PR doesn't support async write like #13892, and actually the time cost of flushing records to file will be greater than sorting. Do you have any benchmark number about perf comparison of this PR and the previous version. |
|
@hudi-bot run azure |
|
Thanks @cshuo for the review! Regarding async write support: The key benefits of continuous sorting:
Regarding benchmarks: Would you like me to add some basic benchmarking to quantify the latency improvements? Or do you have specific concerns about the approach that I should address first? @hudi-bot run azure |
|
@prashantwason thks for the detail explanation.
Regarding the append write with buffer sorting, there are already two approaches, This PR adopts a continuous sorting approach, which can indeed alleviate sorting spikes. But the flush process involves not only sorting but also file-writing overhead and continuous sort will spread sorting overhead across each record writing process. Therefore, I’d like to understand how much performance gain this PR will bring compared to the existed functions. |
|
hey @prashantwason : can you respond to @cshuo on this asks. |
- Move config validation before super.open() to avoid IllegalStateException when Flink runtime context is not initialized (fixes 5 validation tests) - Add RecordComparator for full comparison fallback when normalized keys are equal, ensuring correct multi-field sort order (fixes 2 sort tests) - Store RowData reference in SortKey for RecordComparator access
|
@cshuo Thanks for the thoughtful follow-up. You raise a valid point about the existing Here's how continuous sorting differs from the existing async approaches: Existing approaches (BIM/Disruptor):
Continuous sorting (this PR):
The key trade-off is:
These approaches are complementary — continuous sorting could potentially be combined with async write in the future. This PR adds it as an opt-in alternative via I don't have formal benchmark numbers yet. Would it be helpful if I ran a comparison benchmark against the BIM approach to quantify the latency distribution differences? |
|
Pushed a fix for the CI test failures: Fixed 5 validation tests (
Fixed 2 sort order tests (
The Azure CI failure was infrastructure-related (Docker process exit code 1), not a test failure. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18083 +/- ##
============================================
+ Coverage 61.43% 68.79% +7.35%
- Complexity 23082 28221 +5139
============================================
Files 2108 2461 +353
Lines 127636 135388 +7752
Branches 14534 16410 +1876
============================================
+ Hits 78409 93135 +14726
+ Misses 42873 34875 -7998
- Partials 6354 7378 +1024
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Thks for the detail explanation, make sense to me.
Would it be helpful if I ran a comparison benchmark against the BIM approach to quantify the latency distribution differences?
yeah, that will be helpful, a concrete result would provide more valuable reference, e.g., how much can the spikes be reduced and how much does the average throughput decrease.
Left some comments for the implementation.
| } | ||
|
|
||
| // Parse and validate sort keys | ||
| String sortKeys = config.get(FlinkOptions.WRITE_BUFFER_SORT_KEYS); |
There was a problem hiding this comment.
The existing append-sort code resolves missing write.buffer.sort.keys by falling back to the record key via AppendWriteFunctions.resolveSortKeys, can the function also follow the behavior?
There was a problem hiding this comment.
Done. Now using AppendWriteFunctions.resolveSortKeys(config) which falls back to record key when write.buffer.sort.keys is not set.
| normalizedKeyComputer.putKey(data, reusableKeySegment, 0); | ||
|
|
||
| // Create sort key (copies the normalized key from reusable segment) | ||
| SortKey key = new SortKey(reusableKeySegment, normalizedKeySize, data, insertionSequence++); |
There was a problem hiding this comment.
The map stores raw RowData references and later compares them. With Flink object reuse enabled, those retained rows can be mutated after insertion, which breaks
TreeMap ordering guarantees and risks incorrect output.
Use getRuntimeContext().isObjectReuseEnabled(); to check whether object reuse is enabled, and copy rowData here if necessary.
There was a problem hiding this comment.
Done. Added object reuse detection via getRuntimeContext().isObjectReuseEnabled() and RowDataSerializer.copy() in processElement() when object reuse is enabled.
| // Sort key computation | ||
| private transient NormalizedKeyComputer normalizedKeyComputer; | ||
| private transient RecordComparator recordComparator; | ||
| private transient byte[] reusableKeyBuffer; |
There was a problem hiding this comment.
Done. Made reusableKeyBuffer a local variable in open().
| } | ||
|
|
||
| // Clear buffer and reset sequence | ||
| sortedRecords.clear(); |
There was a problem hiding this comment.
Put this line inside the upper if branch?
There was a problem hiding this comment.
Done. Moved sortedRecords.clear() and insertionSequence = 0L inside the if (!sortedRecords.isEmpty()) branch.
| } | ||
|
|
||
| // Reset for next checkpoint interval | ||
| sortedRecords.clear(); |
There was a problem hiding this comment.
Put this line inside the upper if branch?
There was a problem hiding this comment.
Done. Moved sortedRecords.clear() and insertionSequence = 0L inside the if (!sortedRecords.isEmpty()) branch.
| /** | ||
| * Sink function to write data with continuous sorting for improved compression. | ||
| * | ||
| * <p>Unlike {@link AppendWriteFunctionWithBufferSort} which uses batch sorting, |
There was a problem hiding this comment.
Class name AppendWriteFunctionWithBufferSort is stale and has been renamed.
There was a problem hiding this comment.
Done. Fixed to reference AppendWriteFunctionWithBIMBufferSort.
| .defaultValue(false) // default use batch sorting | ||
| .withDescription("Whether to use continuous sorting (TreeMap-based) instead of batch sorting. " | ||
| + "Continuous sorting provides O(log n) inserts and incremental draining, " | ||
| + "but has higher per-record overhead. Requires write.buffer.sort.enabled=true."); |
There was a problem hiding this comment.
write.buffer.sort.enabled is deprecated, and append writing uses WRITE_BUFFER_TYPE to select different write functions, can we also use WRITE_BUFFER_TYPE here?
There was a problem hiding this comment.
Done. Removed WRITE_BUFFER_SORT_CONTINUOUS_ENABLED config entirely. Now uses WRITE_BUFFER_TYPE=CONTINUOUS_SORT via the BufferType enum. Added CONTINUOUS_SORT to BufferType and updated AppendWriteFunctions.create() to route accordingly. Updated tests as well.
…ject reuse, cleanup - Replace WRITE_BUFFER_SORT_CONTINUOUS_ENABLED with WRITE_BUFFER_TYPE=CONTINUOUS_SORT - Add CONTINUOUS_SORT to BufferType enum - Use AppendWriteFunctions.resolveSortKeys() for sort key resolution with record key fallback - Copy RowData when Flink object reuse is enabled to prevent mutation - Move sortedRecords.clear()/insertionSequence reset inside if branches - Fix stale class name reference in javadoc - Update test to use new WRITE_BUFFER_TYPE config Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@cshuo All review comments have been addressed:
Please take another look when you get a chance. Thanks! |
…tions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…it operations" This reverts commit 847773a.
| } catch (IOException e) { | ||
| throw new HoodieIOException("Failed to drain buffer during snapshot", e); | ||
| } finally { | ||
| super.snapshotState(); |
There was a problem hiding this comment.
Let's move super.snapshotState(); out of the finally clause like other append write functions, since write meta event should not be sent to coordinator when there is ingestion exception.
| this.reusableKeySegment = MemorySegmentFactory.wrap(reusableKeyBuffer); | ||
|
|
||
| // Detect object reuse mode and create serializer for copying if needed | ||
| this.objectReuseEnabled = getRuntimeContext().isObjectReuseEnabled(); |
There was a problem hiding this comment.
There is compiling error in CI, seems we need to adapt isObjectReuseEnabled() for lower flink versions, like flink 1.17, 1.18.
There was a problem hiding this comment.
Besides, can you also add a test with object reuse enabled?
There was a problem hiding this comment.
Done. Using RuntimeContextUtils.isObjectReuseEnabled(getRuntimeContext()) which adapts across Flink versions. For Flink 1.17-1.20, it delegates to getExecutionConfig().isObjectReuseEnabled(), and for Flink 2.0+/2.1+, it calls runtimeContext.isObjectReuseEnabled() directly.
There was a problem hiding this comment.
Done. Added testObjectReuseEnabled() which uses a single reusable GenericRowData instance and verifies all 3 records are distinct after write.
| // Initialize TreeMap with comparator that uses normalized keys for fast comparison | ||
| // and falls back to RecordComparator for full comparison when normalized keys are equal | ||
| this.sortedRecords = new TreeMap<>((k1, k2) -> { | ||
| MemorySegment seg1 = MemorySegmentFactory.wrap(k1.keyBytes); |
There was a problem hiding this comment.
The TreeMap comparator wraps keyBytes into new MemorySegment objects for every comparison. Since comparisons happen frequently on insert/drain, this creates avoidable allocation pressure in the hot path, which increases GC preasure.
We can reuse/precompute comparable representations per SortKey, i.e., store wrapped segments in SortKey.
There was a problem hiding this comment.
Done. SortKey now copies the normalized key bytes and wraps them as a MemorySegment once in its constructor, so the TreeMap comparator uses pre-computed k1.keySegment/k2.keySegment without any per-comparison allocation.
|
|
||
| } catch (IOException e) { | ||
| throw new HoodieIOException("Failed to drain buffer during endInput", e); | ||
| } finally { |
There was a problem hiding this comment.
Let's move super.endInput(); out of the finally clause like other append write functions.
There was a problem hiding this comment.
Done. super.endInput() is already outside the try/catch block, consistent with the other append write functions.
…e test - Fix isObjectReuseEnabled() compilation on Flink 1.17/1.18 by using getExecutionConfig().isObjectReuseEnabled() instead of RuntimeContext method - Remove unused imports (java.util.Arrays, java.util.stream.Collectors) to fix checkstyle violations - Move super.snapshotState() and super.endInput() out of finally clauses to match other append write functions (avoid sending write meta event on error) - Optimize TreeMap comparator: store pre-wrapped MemorySegment in SortKey to avoid per-comparison allocation and reduce GC pressure - Filter empty strings after trim in resolveSortKeys() to properly validate whitespace-only and comma-only sort key configs - Add testObjectReuseEnabled test that verifies records are correctly copied when Flink object reuse is enabled (prevents TreeMap corruption) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@cshuo All review comments from the latest round have been addressed in commit e592166:
Please take another look when you get a chance. Thanks! |
| this.reusableKeySegment = MemorySegmentFactory.wrap(reusableKeyBuffer); | ||
|
|
||
| // Detect object reuse mode and create serializer for copying if needed | ||
| this.objectReuseEnabled = getRuntimeContext().getExecutionConfig().isObjectReuseEnabled(); |
There was a problem hiding this comment.
There is some compile error for higher flink versions.
Caused by: org.apache.maven.plugin.compiler.CompilationFailureException: Compilation failure
/home/runner/work/hudi/hudi/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java:[158,50] cannot find symbol
symbol: method getExecutionConfig()
You can refer to RuntimeContextUtils to adapt object reuse check for different flink versions.
There was a problem hiding this comment.
Done. Using RuntimeContextUtils.isObjectReuseEnabled() which delegates to getExecutionConfig().isObjectReuseEnabled() for Flink 1.17-1.20 and runtimeContext.isObjectReuseEnabled() directly for Flink 2.0+/2.1+.
…useEnabled getExecutionConfig() was removed from RuntimeContext in Flink 2.0+. Add isObjectReuseEnabled() to version-specific RuntimeContextUtils adapters: - Flink 1.17-1.20: uses getExecutionConfig().isObjectReuseEnabled() - Flink 2.0-2.1: uses getGlobalJobParameters() with PipelineOptions.OBJECT_REUSE Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| AUTO_WATERMARK_INTERVAL.defaultValue().toMillis() + "")); | ||
| } | ||
|
|
||
| public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { |
There was a problem hiding this comment.
use runtimeContext.isObjectReuseEnabled() directly.
There was a problem hiding this comment.
Done. Using runtimeContext.isObjectReuseEnabled() directly for Flink 2.0.x.
| } | ||
|
|
||
| public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { | ||
| Map<String, String> jobParameters = runtimeContext.getGlobalJobParameters(); |
There was a problem hiding this comment.
use runtimeContext.isObjectReuseEnabled() directly.
There was a problem hiding this comment.
Done. Using runtimeContext.isObjectReuseEnabled() directly for Flink 2.1.x.
|
hey @cshuo: can you re-review the patch. |
@nsivabalan Except for the last two comments, which are not resolved yet @prashantwason , the PR LGTM overall. |
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — This PR introduces a new continuous sorting buffer type for Flink append writes, which is a great feature. The implementation looks solid, with good test coverage for various scenarios, including object reuse and error handling. Overall, the code is clean, well-structured, and adheres to Hudi's coding standards. I have a few minor nits below.
| * <p>Strategy: | ||
| * <ol> | ||
| * <li>Records are inserted in sorted order (TreeMap)</li> | ||
| * <li>When buffer reaches max capacity, oldest record(s) are drained synchronously</li> |
There was a problem hiding this comment.
🤖 nit: Could maxCapacity and drainSize be made final as they are configured once and not changed?
There was a problem hiding this comment.
Already final. maxCapacity is private final long and drainSize is private final int, set once in the constructor.
| private transient RecordComparator recordComparator; | ||
| private transient MemorySegment reusableKeySegment; | ||
| private transient int normalizedKeySize; | ||
| private transient boolean objectReuseEnabled; |
There was a problem hiding this comment.
🤖 nit: It might be clearer to initialize insertionSequence to 0L when sortedRecords is cleared in snapshotState() and endInput() to consistently reflect a fresh state, similar to how it's done in the constructor. Or, is it intended to continue the sequence?
There was a problem hiding this comment.
Already done. insertionSequence is reset to 0L alongside sortedRecords.clear() in both snapshotState() and endInput().
| String.format("Buffer capacity must be positive, got: %d", maxCapacity)); | ||
| } | ||
|
|
||
| if (drainSize <= 0) { |
There was a problem hiding this comment.
🤖 nit: The check if (cmp != 0) is a common pattern, but it's redundant. recordComparator.compare and Long.compare also return 0 if equal, so you could directly return their results if cmp == 0 without an explicit check.
There was a problem hiding this comment.
The if (cmp != 0) guards are intentional here - this is a 3-level comparator chain (normalized key -> full record -> insertion order). The early returns avoid the more expensive recordComparator.compare() when the cheaper normalizedKeyComputer.compareKey() already distinguishes the records. Keeping the pattern explicit for readability.
There was a problem hiding this comment.
Line 97: 🤖 nit: The s -> !s.isEmpty() filter is good. Could we also add a .trim() to this stream pipeline before filtering, to handle cases like "key1, ,key2" that would result in an empty string after trimming?
There was a problem hiding this comment.
Already handled. The stream pipeline does .map(String::trim).filter(s -> !s.isEmpty()), so "key1, ,key2" splits to ["key1", " ", "key2"], trims to ["key1", "", "key2"], then filters out the empty string to get ["key1", "key2"].
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
LGTM! This is a solid addition for incremental sorting that helps mitigate the latency spikes of batch sorting. I've left a couple of architectural notes for you to consider around memory footprint tracking and minimizing short-lived allocations during draining.
| drainRecords(drainSize); | ||
|
|
||
| // Verify there's space after draining | ||
| if (sortedRecords.size() >= maxCapacity) { |
There was a problem hiding this comment.
🤖 Unlike BIMBufferSort, this sorting mode does not bound its memory footprint using write.task.max.size.
There was a problem hiding this comment.
Fixed. Added TotalSizeTracer to bound the buffer memory footprint using write.task.max.size. Now processElement() triggers draining when either the record count limit (write.buffer.size) or memory size limit is exceeded. Record size is estimated once via ObjectSizeCalculator and tracked via TotalSizeTracer.trace()/countDown()/reset().
| totalDrainOperations++; | ||
| totalDrainedRecords += drained; | ||
| } | ||
|
|
There was a problem hiding this comment.
🤖 Allocating an Iterator for every drained record can create unnecessary garbage when drainSize is small.
There was a problem hiding this comment.
Fixed. Replaced the Iterator-based loop with TreeMap.pollFirstEntry() which removes and returns the first entry directly, avoiding Iterator object allocation entirely. This matters especially when drainSize=1 (the default), where each incoming record at capacity would have created a new Iterator.
…rectly for Flink 2.0+/2.1+ Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add TotalSizeTracer to bound buffer memory footprint using write.task.max.size, triggering drain when memory limit is exceeded in addition to record count limit - Replace Iterator-based drain loop with TreeMap.pollFirstEntry() to avoid iterator allocation per drain call (matters when drainSize=1) - Estimate record size once via ObjectSizeCalculator and track cumulative memory via TotalSizeTracer.trace()/countDown()/reset() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback — both my prior findings are resolved nicely. The memory footprint is now bounded via TotalSizeTracer with write.task.max.size, and the drain loop uses pollFirstEntry() to avoid iterator allocation. The RuntimeContextUtils simplification for Flink 2.0.x/2.1.x to use runtimeContext.isObjectReuseEnabled() directly is clean. All other reviewer comments appear addressed as well. LGTM.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This PR introduces continuous sorting functionality for Hudi Flink datasource. A new TreeMap-based buffer type (CONTINUOUS_SORT) enables in-memory sorted write operations with configurable incremental draining. The implementation includes new configuration options, a complete write function with lifecycle management, comprehensive integration tests, and utility enhancements for object reuse support across Flink versions.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Client as Client
participant Processor as AppendWriteFunctionWithContinuousSort
participant TreeMap as TreeMap Buffer
participant Writer as Underlying Writer
participant State as State Backend
Client->>Processor: processElement(RowData)
Note over Processor: Compute normalized key
Processor->>TreeMap: insert(SortKey, record)
Note over Processor: Check buffer capacity/memory
alt Buffer exceeds limit
Processor->>TreeMap: pollFirstEntry() x drain_size
TreeMap-->>Processor: oldest records
Processor->>Writer: flush records
end
Client->>Processor: snapshotState() / endInput()
Processor->>TreeMap: pollFirstEntry() until empty
TreeMap-->>Processor: all buffered records
Processor->>Writer: flush remaining
Processor->>State: save checkpoint state
Client->>Processor: close()
Processor->>Writer: delegate to super.close()
CodeRabbit: yihua#46 (review)
| private RowType rowType; | ||
|
|
||
| @TempDir | ||
| protected File tempFile; |
There was a problem hiding this comment.
Shadowed @TempDir tempFile field.
This class declares @TempDir protected File tempFile (line 61-62), but TestWriteBase already declares @TempDir protected File tempFile (inherited). This shadowing may cause confusion and could lead to unexpected behavior if both are initialized differently by JUnit.
Consider removing the local declaration since the inherited field should suffice.
🔧 Proposed fix
public class ITTestAppendWriteFunctionWithContinuousSort extends TestWriteBase {
private Configuration conf;
private RowType rowType;
-
- `@TempDir`
- protected File tempFile;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java`
around lines 61 - 62, The class ITTestAppendWriteFunctionWithContinuousSort
currently redeclares the JUnit TempDir field as "protected File tempFile",
shadowing the same "@TempDir protected File tempFile" declared in the superclass
TestWriteBase; remove the local declaration of tempFile from
ITTestAppendWriteFunctionWithContinuousSort so the test uses the inherited
TestWriteBase.tempFile, and update any local references in that class to rely on
the inherited field name (no other changes required).
— CodeRabbit (original) (source:comment#3095501107)
|
+1 |
Describe the issue this Pull Request addresses
This PR implements a continuous sorting feature for the Flink append write sink. The continuous sorting mode maintains sorted order incrementally using a TreeMap, which avoids large pause times from batch sorting and reduces single-partition lag during ingestion by minimizing backpressure.
Summary and Changelog
Summary: Added a continuous sorting mode (
AppendWriteFunctionWithContinuousSort) that provides non-blocking O(log n) inserts and incremental draining, offering predictable latency without sort spikes.Changelog:
AppendWriteFunctionWithContinuousSortclass that keeps records in a TreeMap keyed by a code-generated normalized key and an insertion sequenceAppendWriteFunctions.createto instantiate the continuous sorter whenWRITE_BUFFER_SORT_CONTINUOUS_ENABLEDis truewrite.buffer.sort.continuous.enabled- Whether to use continuous sorting instead of batch sortingwrite.buffer.sort.continuous.drain.size- Number of records to drain each time max capacity is reachedITTestAppendWriteFunctionWithContinuousSortintegration tests covering buffer flush triggers, sorted output correctness, drain behaviors, and invalid-parameter error casesImpact
New Configuration Options:
write.buffer.sort.continuous.enabled(default:false) - Enables continuous sorting modewrite.buffer.sort.continuous.drain.size(default:1) - Controls drain batch sizeNo breaking changes to existing functionality. The feature is disabled by default.
Risk Level
low - This is a new optional feature that is disabled by default. Existing behavior is unchanged unless the user explicitly enables continuous sorting.
Documentation Update
The config description is included in the code. Documentation update for the Hudi website may be needed to describe the new continuous sorting feature and its configuration options.
Contributor's checklist