Conversation
… sorted order incrementally and avoid single-partition lag during ingestion by reducing large pause time from sort and backpressure Summary: - Added AppendWriteFunctionWithContinuousSort which keeps records in a TreeMap keyed by a code-generated normalized key and an insertion sequence, drains oldest entries when a configurable threshold is reached, and writes drained records immediately; snapshot/endInput drain remaining records. - Updated AppendWriteFunctions.create to instantiate the continuous sorter when WRITE_BUFFER_SORT_CONTINUOUS_ENABLED is true. - Introduced three new FlinkOptions: WRITE_BUFFER_SORT_CONTINUOUS_ENABLED, WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_THRESHOLD_PERCENT, and WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, and added runtime validation (buffer > 0, 0 < threshold < 100, drainSize > 0, parsed non-empty sort keys). - Added ITTestAppendWriteFunctionWithContinuousSort integration tests covering buffer flush triggers, sorted output correctness (with and without continuous drain), drain threshold/size behaviors, and invalid-parameter error cases. Verified that we can push higher CPU utilization and input rate with continuous sorting enabled (See before and after 11:30AM)
- Move config validation before super.open() to avoid IllegalStateException when Flink runtime context is not initialized (fixes 5 validation tests) - Add RecordComparator for full comparison fallback when normalized keys are equal, ensuring correct multi-field sort order (fixes 2 sort tests) - Store RowData reference in SortKey for RecordComparator access
…ject reuse, cleanup - Replace WRITE_BUFFER_SORT_CONTINUOUS_ENABLED with WRITE_BUFFER_TYPE=CONTINUOUS_SORT - Add CONTINUOUS_SORT to BufferType enum - Use AppendWriteFunctions.resolveSortKeys() for sort key resolution with record key fallback - Copy RowData when Flink object reuse is enabled to prevent mutation - Move sortedRecords.clear()/insertionSequence reset inside if branches - Fix stale class name reference in javadoc - Update test to use new WRITE_BUFFER_TYPE config Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…it operations" This reverts commit 847773a.
…e test - Fix isObjectReuseEnabled() compilation on Flink 1.17/1.18 by using getExecutionConfig().isObjectReuseEnabled() instead of RuntimeContext method - Remove unused imports (java.util.Arrays, java.util.stream.Collectors) to fix checkstyle violations - Move super.snapshotState() and super.endInput() out of finally clauses to match other append write functions (avoid sending write meta event on error) - Optimize TreeMap comparator: store pre-wrapped MemorySegment in SortKey to avoid per-comparison allocation and reduce GC pressure - Filter empty strings after trim in resolveSortKeys() to properly validate whitespace-only and comma-only sort key configs - Add testObjectReuseEnabled test that verifies records are correctly copied when Flink object reuse is enabled (prevents TreeMap corruption) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…useEnabled getExecutionConfig() was removed from RuntimeContext in Flink 2.0+. Add isObjectReuseEnabled() to version-specific RuntimeContextUtils adapters: - Flink 1.17-1.20: uses getExecutionConfig().isObjectReuseEnabled() - Flink 2.0-2.1: uses getGlobalJobParameters() with PipelineOptions.OBJECT_REUSE Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rectly for Flink 2.0+/2.1+ Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add TotalSizeTracer to bound buffer memory footprint using write.task.max.size, triggering drain when memory limit is exceeded in addition to record count limit - Replace Iterator-based drain loop with TreeMap.pollFirstEntry() to avoid iterator allocation per drain call (matters when drainSize=1) - Estimate record size once via ObjectSizeCalculator and track cumulative memory via TotalSizeTracer.trace()/countDown()/reset() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR introduces continuous sorting functionality for Hudi Flink datasource. A new TreeMap-based buffer type ( Changes
Sequence DiagramsequenceDiagram
participant Client as Client
participant Processor as AppendWriteFunctionWithContinuousSort
participant TreeMap as TreeMap Buffer
participant Writer as Underlying Writer
participant State as State Backend
Client->>Processor: processElement(RowData)
Note over Processor: Compute normalized key
Processor->>TreeMap: insert(SortKey, record)
Note over Processor: Check buffer capacity/memory
alt Buffer exceeds limit
Processor->>TreeMap: pollFirstEntry() x drain_size
TreeMap-->>Processor: oldest records
Processor->>Writer: flush records
end
Client->>Processor: snapshotState() / endInput()
Processor->>TreeMap: pollFirstEntry() until empty
TreeMap-->>Processor: all buffered records
Processor->>Writer: flush remaining
Processor->>State: save checkpoint state
Client->>Processor: close()
Processor->>Writer: delegate to super.close()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java (1)
64-81: Note:beforemethod receives unused@TempDirparameter.The
@BeforeEachmethod signature includes@TempDir File tempDirbut usestempFileinstead. This is likely intentional (using the class field), but the parameter is unused.♻️ Remove unused parameter
`@BeforeEach` - public void before(`@TempDir` File tempDir) throws Exception { + public void before() throws Exception { this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java` around lines 64 - 81, The before(`@TempDir` File tempDir) method declares a tempDir parameter but never uses it (the method uses the class field tempFile instead); either remove the unused `@TempDir` parameter from the before method signature or replace usages of tempFile with the provided tempDir so the temporary directory is actually used; update the method signature and any references in the before method (method name: before, symbols: `@TempDir`, tempDir, tempFile) accordingly to eliminate the unused-parameter warning and ensure the test uses the intended temp directory.hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java (3)
246-264: Minor: RedundantsortedRecords.clear()after draining all records.Line 251 drains all records using
drainRecords(sortedRecords.size()), and sincedrainRecordsusespollFirstEntry(), the map will already be empty. The subsequentsortedRecords.clear()on line 252 is redundant.This is not harmful, just unnecessary.
♻️ Optional cleanup
if (!sortedRecords.isEmpty()) { LOG.info("Snapshot: draining {} remaining records", sortedRecords.size()); drainRecords(sortedRecords.size()); - sortedRecords.clear(); insertionSequence = 0L; sizeTracer.reset(); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java` around lines 246 - 264, In snapshotState(), remove the redundant call to sortedRecords.clear() after drainRecords(sortedRecords.size()) since drainRecords uses pollFirstEntry() and already empties sortedRecords; keep the rest of the logic (logging, resetting insertionSequence and sizeTracer, and the IOException handling) intact and ensure snapshotState() still calls super.snapshotState() at the end.
266-282: Same redundancy inendInput()as noted forsnapshotState().Line 273
sortedRecords.clear()is redundant afterdrainRecords(sortedRecords.size()).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java` around lines 266 - 282, In endInput(), remove the redundant sortedRecords.clear() call after drainRecords(sortedRecords.size()) since drainRecords already empties the buffer; keep the subsequent state resets (insertionSequence = 0L and sizeTracer.reset()) and preserve the IOException handling that wraps drainRecords into a HoodieIOException; update the method to rely on drainRecords to clear sortedRecords and eliminate the unnecessary clear invocation.
209-212: Consider variable record sizes for memory tracking.
estimatedRecordSizeis captured from the first record only. For schemas with variable-length fields (e.g., strings), this estimate may become increasingly inaccurate over time, potentially leading to premature or delayed draining.Consider periodically re-sampling or using a rolling average for more accurate memory tracking. This is a minor concern if record sizes are relatively uniform.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java` around lines 209 - 212, The current memory tracking in AppendWriteFunctionWithContinuousSort uses a single fixed estimatedRecordSize (set via ObjectSizeCalculator.getObjectSize(data) on the first record) which can be inaccurate for variable-length records; modify the logic that sets and uses estimatedRecordSize (and the sizeTracer.trace call) to periodically re-sample or compute a rolling/EMA average of record sizes (e.g., every N records or using an exponential moving average that incorporates ObjectSizeCalculator.getObjectSize(data) for sampled records) and use that updated estimate in sizeTracer.trace so memory tracking adapts to changing record sizes while keeping updates lightweight and thread-safe.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java`:
- Around line 61-62: The class ITTestAppendWriteFunctionWithContinuousSort
currently redeclares the JUnit TempDir field as "protected File tempFile",
shadowing the same "@TempDir protected File tempFile" declared in the superclass
TestWriteBase; remove the local declaration of tempFile from
ITTestAppendWriteFunctionWithContinuousSort so the test uses the inherited
TestWriteBase.tempFile, and update any local references in that class to rely on
the inherited field name (no other changes required).
---
Nitpick comments:
In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java`:
- Around line 246-264: In snapshotState(), remove the redundant call to
sortedRecords.clear() after drainRecords(sortedRecords.size()) since
drainRecords uses pollFirstEntry() and already empties sortedRecords; keep the
rest of the logic (logging, resetting insertionSequence and sizeTracer, and the
IOException handling) intact and ensure snapshotState() still calls
super.snapshotState() at the end.
- Around line 266-282: In endInput(), remove the redundant sortedRecords.clear()
call after drainRecords(sortedRecords.size()) since drainRecords already empties
the buffer; keep the subsequent state resets (insertionSequence = 0L and
sizeTracer.reset()) and preserve the IOException handling that wraps
drainRecords into a HoodieIOException; update the method to rely on drainRecords
to clear sortedRecords and eliminate the unnecessary clear invocation.
- Around line 209-212: The current memory tracking in
AppendWriteFunctionWithContinuousSort uses a single fixed estimatedRecordSize
(set via ObjectSizeCalculator.getObjectSize(data) on the first record) which can
be inaccurate for variable-length records; modify the logic that sets and uses
estimatedRecordSize (and the sizeTracer.trace call) to periodically re-sample or
compute a rolling/EMA average of record sizes (e.g., every N records or using an
exponential moving average that incorporates
ObjectSizeCalculator.getObjectSize(data) for sampled records) and use that
updated estimate in sizeTracer.trace so memory tracking adapts to changing
record sizes while keeping updates lightweight and thread-safe.
In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java`:
- Around line 64-81: The before(`@TempDir` File tempDir) method declares a tempDir
parameter but never uses it (the method uses the class field tempFile instead);
either remove the unused `@TempDir` parameter from the before method signature or
replace usages of tempFile with the provided tempDir so the temporary directory
is actually used; update the method signature and any references in the before
method (method name: before, symbols: `@TempDir`, tempDir, tempFile) accordingly
to eliminate the unused-parameter warning and ensure the test uses the intended
temp directory.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9528ddcc-febe-47a1-91cf-8782b99a385e
📒 Files selected for processing (15)
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.javahudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.javahudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.javahudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.javahudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.javahudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.javahudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.javahudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.javahudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.javahudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.javahudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.javahudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.javahudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.javahudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.javahudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java
| @TempDir | ||
| protected File tempFile; |
There was a problem hiding this comment.
Shadowed @TempDir tempFile field.
This class declares @TempDir protected File tempFile (line 61-62), but TestWriteBase already declares @TempDir protected File tempFile (inherited). This shadowing may cause confusion and could lead to unexpected behavior if both are initialized differently by JUnit.
Consider removing the local declaration since the inherited field should suffice.
🔧 Proposed fix
public class ITTestAppendWriteFunctionWithContinuousSort extends TestWriteBase {
private Configuration conf;
private RowType rowType;
-
- `@TempDir`
- protected File tempFile;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java`
around lines 61 - 62, The class ITTestAppendWriteFunctionWithContinuousSort
currently redeclares the JUnit TempDir field as "protected File tempFile",
shadowing the same "@TempDir protected File tempFile" declared in the superclass
TestWriteBase; remove the local declaration of tempFile from
ITTestAppendWriteFunctionWithContinuousSort so the test uses the inherited
TestWriteBase.tempFile, and update any local references in that class to rely on
the inherited field name (no other changes required).
Greptile SummaryThis PR introduces a new Key changes:
Confidence Score: 3/5Functionally correct for happy-path usage, but the fixed The feature logic is sound — object reuse is handled, validation is thorough, and the cross-version RuntimeContextUtils additions are correct. However, computing
Important Files Changed
Sequence DiagramsequenceDiagram
participant Source
participant AWFCS as AppendWriteFunctionWithContinuousSort
participant TM as TreeMap (SortKey → RowData)
participant ST as TotalSizeTracer
participant WH as BulkInsertWriterHelper
participant Coord as StreamWriteOperatorCoordinator
Source->>AWFCS: processElement(record)
AWFCS->>TM: size() >= maxCapacity OR bufferSize > maxBufferSize?
alt Buffer full
AWFCS->>WH: initWriterHelper() if null
loop drainSize times
AWFCS->>TM: pollFirstEntry() [smallest sort key]
AWFCS->>WH: write(rowData)
end
AWFCS->>ST: countDown(drained * estimatedRecordSize)
end
AWFCS->>TM: put(SortKey, rowData) [O(log n)]
AWFCS->>ST: trace(estimatedRecordSize)
Note over Source,Coord: On Checkpoint
Source->>AWFCS: snapshotState()
loop all remaining records
AWFCS->>TM: pollFirstEntry()
AWFCS->>WH: write(rowData)
end
AWFCS->>ST: reset()
AWFCS->>AWFCS: super.snapshotState() → flushData()
AWFCS->>Coord: WriteMetadataEvent (write statuses)
Coord->>Coord: commitInstant()
Reviews (1): Last reviewed commit: "Address review comments: add memory boun..." | Re-trigger Greptile |
| if (estimatedRecordSize == 0) { | ||
| estimatedRecordSize = ObjectSizeCalculator.getObjectSize(data); | ||
| } | ||
| sizeTracer.trace(estimatedRecordSize); |
There was a problem hiding this comment.
Memory estimate fixed at first-record size
estimatedRecordSize is computed once from the very first record and never updated. For workloads with variable-length fields (e.g., long VarChar strings or nested structures), subsequent records can be orders of magnitude larger than the first. Because sizeTracer.trace(estimatedRecordSize) and sizeTracer.countDown(drained * estimatedRecordSize) both rely on this single constant, the in-memory buffer can grow far beyond write.task.max_size before the memory-limit drain fires — potentially causing task-manager OOM in production.
Consider computing the size per record (or updating a running weighted average) to keep the memory estimate accurate:
// Per-record size tracking instead of fixed estimate
long recordSize = ObjectSizeCalculator.getObjectSize(data);
sortedRecords.put(key, data);
sizeTracer.trace(recordSize);
totalInserted++;If per-record calculation is too expensive, at minimum update the estimate periodically (e.g., every 1000 records) so drift is bounded.
| if (sortedRecords.size() >= maxCapacity || sizeTracer.bufferSize > sizeTracer.maxBufferSize) { | ||
| drainRecords(drainSize); | ||
|
|
||
| // Verify there's space after draining | ||
| if (sortedRecords.size() >= maxCapacity) { | ||
| throw new HoodieException( | ||
| String.format("Buffer cannot accept record after draining. " | ||
| + "Buffer size: %d, maxCapacity: %d, drainSize: %d", | ||
| sortedRecords.size(), maxCapacity, drainSize)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Post-drain exception guard is unreachable
The HoodieException thrown when sortedRecords.size() >= maxCapacity after draining is logically dead code.
- If the drain was triggered by
sortedRecords.size() >= maxCapacity, then after drainingdrainSize ≥ 1records the size becomesmaxCapacity - Math.min(drainSize, maxCapacity) < maxCapacity, so the guard can never fire. - If the drain was triggered solely by the memory limit (
sizeTracer.bufferSize > sizeTracer.maxBufferSize) with a count belowmaxCapacity, draining reduces the count further — it cannot reachmaxCapacity.
This guard should either be removed or replaced with a memory-limit check if the intent is to detect the case where a single drain cycle is still insufficient to bring the buffer within bounds.
| if (!sortedRecords.isEmpty()) { | ||
| LOG.info("Snapshot: draining {} remaining records", sortedRecords.size()); | ||
| drainRecords(sortedRecords.size()); | ||
| sortedRecords.clear(); | ||
| insertionSequence = 0L; | ||
| sizeTracer.reset(); | ||
| } |
There was a problem hiding this comment.
sortedRecords.clear() is redundant after drainRecords(sortedRecords.size())
drainRecords uses pollFirstEntry() in a loop, which removes each entry from the map as it goes. After draining the full sortedRecords.size() count, the map is already empty. The explicit sortedRecords.clear() call immediately after is a no-op. The same redundancy exists in endInput() (lines 270-276).
| if (!sortedRecords.isEmpty()) { | |
| LOG.info("Snapshot: draining {} remaining records", sortedRecords.size()); | |
| drainRecords(sortedRecords.size()); | |
| sortedRecords.clear(); | |
| insertionSequence = 0L; | |
| sizeTracer.reset(); | |
| } | |
| if (!sortedRecords.isEmpty()) { | |
| LOG.info("Snapshot: draining {} remaining records", sortedRecords.size()); | |
| drainRecords(sortedRecords.size()); | |
| insertionSequence = 0L; | |
| sizeTracer.reset(); | |
| } |
| * <p>Strategy: | ||
| * <ol> | ||
| * <li>Records are inserted in sorted order (TreeMap)</li> | ||
| * <li>When buffer reaches max capacity, oldest record(s) are drained synchronously</li> | ||
| * <li>Drain size is configurable to balance latency vs. throughput vs compression ratio</li> |
There was a problem hiding this comment.
Javadoc says "oldest" but implementation drains "smallest sort key"
The class-level javadoc states:
When buffer reaches max capacity, oldest record(s) are drained synchronously
However, the implementation calls sortedRecords.pollFirstEntry(), which removes the record with the smallest sort key — not the oldest by insertion time. Insertion order is only the final tiebreaker for key-equal records. Draining by smallest key is the correct behavior for incremental sorted output, but the documentation should say "smallest by sort key" to avoid confusion.
| @Override | ||
| public void close() throws Exception { | ||
| try { | ||
| LOG.info("AppendWriteFunctionWithContinuousSort closed: totalInserted={}, totalDrained={}, operations={}", | ||
| totalInserted, totalDrainedRecords, totalDrainOperations); | ||
|
|
||
| } finally { | ||
| super.close(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Silent data loss if
close() is called without a prior flush
If the operator is closed unexpectedly (e.g., due to task cancellation before snapshotState() or endInput() is called), any records still in sortedRecords are silently dropped. While in streaming mode this is mitigated by checkpoint replay, a warning log when sortedRecords is non-empty at close would make such situations visible during debugging:
@Override
public void close() throws Exception {
try {
if (sortedRecords != null && !sortedRecords.isEmpty()) {
LOG.warn("AppendWriteFunctionWithContinuousSort closed with {} unflushed records — "
+ "these records will be lost. totalInserted={}, totalDrained={}",
sortedRecords.size(), totalInserted, totalDrainedRecords);
} else {
LOG.info("AppendWriteFunctionWithContinuousSort closed: totalInserted={}, totalDrained={}, operations={}",
totalInserted, totalDrainedRecords, totalDrainOperations);
}
} finally {
super.close();
}
}
Mirror of apache#18083 for automated bot review.
Original author: @prashantwason
Base branch: master
Summary by CodeRabbit
Release Notes
New Features
CONTINUOUS_SORTbuffer type for write buffering using TreeMap-based continuous sorting with incremental draining.write.buffer.sort.continuous.drain.sizeconfiguration option (default: 1) to control record draining per step during continuous sorting.Tests