From 8226525e5f4f9bc75d61a4d24ab249f59db9b43a Mon Sep 17 00:00:00 2001 From: dsaisharath Date: Wed, 19 Nov 2025 15:55:09 -0800 Subject: [PATCH 1/9] Implemented a continuous sorting mode for the append sink to maintain sorted order incrementally and avoid single-partition lag during ingestion by reducing large pause time from sort and backpressure Summary: - Added AppendWriteFunctionWithContinuousSort which keeps records in a TreeMap keyed by a code-generated normalized key and an insertion sequence, drains oldest entries when a configurable threshold is reached, and writes drained records immediately; snapshot/endInput drain remaining records. - Updated AppendWriteFunctions.create to instantiate the continuous sorter when WRITE_BUFFER_SORT_CONTINUOUS_ENABLED is true. - Introduced three new FlinkOptions: WRITE_BUFFER_SORT_CONTINUOUS_ENABLED, WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_THRESHOLD_PERCENT, and WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, and added runtime validation (buffer > 0, 0 < threshold < 100, drainSize > 0, parsed non-empty sort keys). - Added ITTestAppendWriteFunctionWithContinuousSort integration tests covering buffer flush triggers, sorted output correctness (with and without continuous drain), drain threshold/size behaviors, and invalid-parameter error cases. Verified that we can push higher CPU utilization and input rate with continuous sorting enabled (See before and after 11:30AM) --- .../hudi/configuration/FlinkOptions.java | 19 + ...AppendWriteFunctionWithContinuousSort.java | 321 +++++++++++++++ .../sink/append/AppendWriteFunctions.java | 5 + ...AppendWriteFunctionWithContinuousSort.java | 371 ++++++++++++++++++ 4 files changed, 716 insertions(+) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java create mode 100644 hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 2e7c017225eea..9c96316de6438 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -724,6 +724,25 @@ public class FlinkOptions extends HoodieConfig { + "Data is sorted within the buffer configured by number of records or buffer size. " + "The order of entire written file is not guaranteed."); + @AdvancedConfig + public static final ConfigOption WRITE_BUFFER_SORT_CONTINUOUS_ENABLED = ConfigOptions + .key("write.buffer.sort.continuous.enabled") + .booleanType() + .defaultValue(false) // default use batch sorting + .withDescription("Whether to use continuous sorting (TreeMap-based) instead of batch sorting. " + + "Continuous sorting provides O(log n) inserts and incremental draining, " + + "but has higher per-record overhead. Requires write.buffer.sort.enabled=true."); + + @AdvancedConfig + public static final ConfigOption WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE = ConfigOptions + .key("write.buffer.sort.continuous.drain.size") + .intType() + .defaultValue(1) // default drain 1 record at a time + .withDescription("Number of records to drain each time the max capacity is reached when using continuous sorting. " + + "Default value of 1 provides smooth, incremental draining. " + + "Can be increased for batching if needed (e.g., 10, 100). " + + "Larger values reduce drain frequency but may cause latency spikes."); + @AdvancedConfig public static final ConfigOption WRITE_BUFFER_SIZE = ConfigOptions .key("write.buffer.size") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java new file mode 100644 index 0000000000000..1b3fb75eb88a3 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Sink function to write data with continuous sorting for improved compression. + * + *

Unlike {@link AppendWriteFunctionWithBufferSort} which uses batch sorting, + * this function maintains sorted order continuously using a TreeMap, providing: + *

    + *
  • Non-blocking inserts (O(log n) vs O(1) + periodic O(n log n))
  • + *
  • Incremental draining without re-sorting
  • + *
  • Predictable latency (no sort spikes)
  • + *
+ * + *

Strategy: + *

    + *
  1. Records are inserted in sorted order (TreeMap)
  2. + *
  3. When buffer reaches max capacity, oldest record(s) are drained synchronously
  4. + *
  5. Drain size is configurable to balance latency vs. throughput vs compression ratio
  6. + *
+ * + * @param Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithContinuousSort extends AppendWriteFunction { + + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithContinuousSort.class); + + private final long maxCapacity; + private final int drainSize; + + private transient TreeMap sortedRecords; + private transient long insertionSequence; + + // Sort key computation + private transient NormalizedKeyComputer normalizedKeyComputer; + private transient byte[] reusableKeyBuffer; + private transient MemorySegment reusableKeySegment; + private transient int normalizedKeySize; + + // Metrics + private transient long totalDrainOperations; + private transient long totalDrainedRecords; + private transient long totalInserted; + + public AppendWriteFunctionWithContinuousSort(Configuration config, RowType rowType) { + super(config, rowType); + + // Configuration + this.maxCapacity = config.get(FlinkOptions.WRITE_BUFFER_SIZE); + this.drainSize = config.get(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE); + + LOG.info("AppendWriteFunctionWithContinuousSort created: maxCapacity={}, drainSize={}", + maxCapacity, drainSize); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + // Validate configuration + if (maxCapacity <= 0) { + throw new IllegalArgumentException( + String.format("Buffer capacity must be positive, got: %d", maxCapacity)); + } + + if (drainSize <= 0) { + throw new IllegalArgumentException( + String.format("Drain size must be positive, got: %d", drainSize)); + } + + // Parse and validate sort keys + String sortKeys = config.get(FlinkOptions.WRITE_BUFFER_SORT_KEYS); + if (sortKeys == null || sortKeys.trim().isEmpty()) { + throw new IllegalArgumentException("Sort keys cannot be null or empty for continuous sort"); + } + + List sortKeyList = Arrays.stream(sortKeys.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + if (sortKeyList.isEmpty()) { + throw new IllegalArgumentException( + String.format("Sort keys list is empty after parsing: '%s'", sortKeys)); + } + + LOG.info("Initializing continuous sort with keys: {}", sortKeyList); + + // Create sort code generator for normalized key computation + SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, sortKeyList.toArray(new String[0])); + SortCodeGenerator codeGenerator = sortOperatorGen.createSortCodeGenerator(); + GeneratedNormalizedKeyComputer generatedKeyComputer = codeGenerator.generateNormalizedKeyComputer("ContinuousSortKeyComputer"); + + // Instantiate code-generated normalizedKeyComputer + this.normalizedKeyComputer = generatedKeyComputer.newInstance(Thread.currentThread().getContextClassLoader()); + this.normalizedKeySize = normalizedKeyComputer.getNumKeyBytes(); + + // Initialize TreeMap with comparator that has access to normalizedKeyComputer + // Note: We wrap byte arrays in MemorySegments for comparison + this.sortedRecords = new TreeMap<>((k1, k2) -> { + // Wrap byte arrays in MemorySegments for comparison + MemorySegment seg1 = MemorySegmentFactory.wrap(k1.keyBytes); + MemorySegment seg2 = MemorySegmentFactory.wrap(k2.keyBytes); + + int cmp = normalizedKeyComputer.compareKey(seg1, 0, seg2, 0); + if (cmp != 0) { + return cmp; + } + return Long.compare(k1.insertionOrder, k2.insertionOrder); + }); + this.insertionSequence = 0L; + + // Allocate reusable on-heap buffer for computing keys + // Using heap memory to avoid off-heap memory leak + this.reusableKeyBuffer = new byte[normalizedKeySize]; + this.reusableKeySegment = MemorySegmentFactory.wrap(reusableKeyBuffer); + + // Initialize metrics + this.totalDrainOperations = 0; + this.totalDrainedRecords = 0; + this.totalInserted = 0; + + LOG.info("AppendWriteFunctionWithContinuousSort initialized successfully"); + } + + @Override + public void processElement(T value, Context ctx, Collector out) throws Exception { + RowData data = (RowData) value; + + // Check if buffer has reached max capacity + if (sortedRecords.size() >= maxCapacity) { + drainRecords(drainSize); + + // Verify there's space after draining + if (sortedRecords.size() >= maxCapacity) { + throw new HoodieException( + String.format("Buffer cannot accept record after draining. " + + "Buffer size: %d, maxCapacity: %d, drainSize: %d", + sortedRecords.size(), maxCapacity, drainSize)); + } + } + + // Write to buffer (maintains sorted order) + // Compute normalized key into reusable segment + normalizedKeyComputer.putKey(data, reusableKeySegment, 0); + + // Create sort key (copies the normalized key from reusable segment) + SortKey key = new SortKey(reusableKeySegment, normalizedKeySize, insertionSequence++); + + // Store the original RowData + sortedRecords.put(key, data); + + totalInserted++; + } + + /** + * Drain oldest records from buffer and write to storage. + */ + private void drainRecords(int count) throws IOException { + if (sortedRecords.isEmpty()) { + return; + } + + // Initialize writer if needed + if (this.writerHelper == null) { + initWriterHelper(); + } + + // Drain records from TreeMap + int actualCount = Math.min(count, sortedRecords.size()); + int drained = 0; + + Iterator> iterator = sortedRecords.entrySet().iterator(); + while (iterator.hasNext() && drained < actualCount) { + Map.Entry entry = iterator.next(); + RowData record = entry.getValue(); + + // Write record + writerHelper.write(record); + + // Remove from TreeMap - memory immediately reclaimed + iterator.remove(); + drained++; + } + + totalDrainOperations++; + totalDrainedRecords += drained; + } + + @Override + public void snapshotState() { + try { + // Drain all remaining records + if (!sortedRecords.isEmpty()) { + LOG.info("Snapshot: draining {} remaining records", sortedRecords.size()); + drainRecords(sortedRecords.size()); + } + + // Reset for next checkpoint interval + sortedRecords.clear(); + insertionSequence = 0L; + + LOG.info("Snapshot complete: total drained={}, operations={}", + totalDrainedRecords, totalDrainOperations); + + } catch (IOException e) { + throw new HoodieIOException("Failed to drain buffer during snapshot", e); + } finally { + super.snapshotState(); + } + } + + @Override + public void endInput() { + try { + // Drain all remaining records + if (!sortedRecords.isEmpty()) { + LOG.info("EndInput: draining {} remaining records", sortedRecords.size()); + drainRecords(sortedRecords.size()); + } + + // Clear buffer and reset sequence + sortedRecords.clear(); + insertionSequence = 0L; + + } catch (IOException e) { + throw new HoodieIOException("Failed to drain buffer during endInput", e); + } finally { + super.endInput(); + } + } + + @Override + public void close() throws Exception { + try { + LOG.info("AppendWriteFunctionWithContinuousSort closed: totalInserted={}, totalDrained={}, operations={}", + totalInserted, totalDrainedRecords, totalDrainOperations); + + } finally { + super.close(); + } + } + + /** + * Sort key with normalized key stored in byte array (on-heap). + * Comparison is done via TreeMap comparator. + */ + private static class SortKey { + final byte[] keyBytes; + final long insertionOrder; + + SortKey(MemorySegment sourceSegment, int keySize, long insertionOrder) { + this.insertionOrder = insertionOrder; + + // Copy normalized key from MemorySegment to on-heap byte array + this.keyBytes = new byte[keySize]; + sourceSegment.get(0, keyBytes, 0, keySize); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof SortKey)) { + return false; + } + return this.insertionOrder == ((SortKey) obj).insertionOrder; + } + + @Override + public int hashCode() { + return Long.hashCode(insertionOrder); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java index 39cda29ab4226..e709f85f0cadc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java @@ -50,6 +50,11 @@ public static AppendWriteFunction create(Configuration conf, RowType rowT return new AppendWriteFunctionWithRateLimit<>(rowType, conf); } + // Check if continuous sorting is enabled (requires WRITE_BUFFER_SORT_ENABLED or appropriate buffer type) + if (conf.get(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_ENABLED)) { + return new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + } + String bufferType = resolveBufferType(conf); if (BufferType.DISRUPTOR.name().equalsIgnoreCase(bufferType)) { return new AppendWriteFunctionWithDisruptorBufferSort<>(conf, rowType); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java new file mode 100644 index 0000000000000..913987967a603 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.utils.TestWriteBase; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test cases for {@link AppendWriteFunctionWithContinuousSort}. + */ +public class ITTestAppendWriteFunctionWithContinuousSort extends TestWriteBase { + private Configuration conf; + private RowType rowType; + + @TempDir + protected File tempFile; + + @BeforeEach + public void before(@TempDir File tempDir) throws Exception { + this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_ENABLED, true); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_ENABLED, true); + this.conf.set(FlinkOptions.OPERATION, "insert"); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age"); + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 100L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 1); + + // Define the row type with fields: name (STRING), age (INT), partition (STRING) + List fields = new ArrayList<>(); + fields.add(new RowType.RowField("uuid", VarCharType.STRING_TYPE)); + fields.add(new RowType.RowField("name", VarCharType.STRING_TYPE)); + fields.add(new RowType.RowField("age", new IntType())); + fields.add(new RowType.RowField("ts", new TimestampType())); + fields.add(new RowType.RowField("partition", VarCharType.STRING_TYPE)); + this.rowType = new RowType(fields); + } + + @Test + public void testBufferFlushOnRecordNumberLimit() throws Exception { + // Create test data that exceeds buffer size + List inputData = new ArrayList<>(); + for (int i = 0; i < 150; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(150, actualData.size()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testBufferFlush(boolean flushOnCheckpoint) throws Exception { + // Create test data + List inputData = Arrays.asList( + createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"), + createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1") + ); + + // Write the data and wait for timer + TestHarness testHarness = + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData); + if (flushOnCheckpoint) { + testHarness.checkpoint(1); + } else { + testHarness.endInput(); + } + + // Verify data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(2, actualData.size()); + } + + @Test + public void testBufferFlushOnBufferSizeLimit() throws Exception { + // enlarge the write buffer record size + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10000L); + // use a very small buffer memory size here + this.conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.1D); + + // Create test data that exceeds buffer size + List inputData = new ArrayList<>(); + for (int i = 0; i < 2000; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(2000, actualData.size()); + } + + @Test + public void testSortedResult() throws Exception { + // Create test data in unsorted order + List inputData = Arrays.asList( + createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"), + createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"), + createRowData("uuid3", "Bob", 21, "1970-01-01 00:00:31.124", "p1") + ); + + // Expected result after sorting by name, then age + List expected = Arrays.asList( + "uuid2,Alice,25,1970-01-01 00:00:01.124,p1", + "uuid3,Bob,21,1970-01-01 00:00:31.124,p1", + "uuid1,Bob,30,1970-01-01 00:00:01.123,p1"); + + // Write the data and wait for timer + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .checkpoint(1) + .endInput(); + + // Verify data was written + List result = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(3, result.size()); + + List filteredResult = + result.stream().map(TestData::filterOutVariablesWithoutHudiMetadata).collect(Collectors.toList()); + + assertArrayEquals(expected.toArray(), filteredResult.toArray()); + } + + @Test + public void testContinuousDrainBehavior() throws Exception { + // Set buffer size to 10 records + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 2); + + // Create test data that will trigger drain + List inputData = new ArrayList<>(); + for (int i = 0; i < 12; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data - should trigger continuous draining + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written despite buffer size limit + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(12, actualData.size()); + } + + @Test + public void testDrainSizeConfiguration() throws Exception { + // Set buffer size to 10 and drain size to 5 + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 5); + + // Create test data that will trigger multiple drains + List inputData = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data - should trigger draining in batches of 5 + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(20, actualData.size()); + } + + @Test + public void testSortedResultWithContinuousDrain() throws Exception { + // Set smaller buffer to force continuous draining + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 5L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 1); + + // Create test data with various names and ages + List inputData = Arrays.asList( + createRowData("uuid1", "Charlie", 35, "1970-01-01 00:00:01.123", "p1"), + createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"), + createRowData("uuid3", "Bob", 30, "1970-01-01 00:00:01.125", "p1"), + createRowData("uuid4", "Alice", 20, "1970-01-01 00:00:01.126", "p1"), + createRowData("uuid5", "Bob", 28, "1970-01-01 00:00:01.127", "p1"), + createRowData("uuid6", "Charlie", 40, "1970-01-01 00:00:01.128", "p1") + ); + + // Expected result after sorting by name, then age + List expected = Arrays.asList( + "uuid4,Alice,20,1970-01-01 00:00:01.126,p1", + "uuid2,Alice,25,1970-01-01 00:00:01.124,p1", + "uuid5,Bob,28,1970-01-01 00:00:01.127,p1", + "uuid3,Bob,30,1970-01-01 00:00:01.125,p1", + "uuid1,Charlie,35,1970-01-01 00:00:01.123,p1", + "uuid6,Charlie,40,1970-01-01 00:00:01.128,p1" + ); + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .checkpoint(1) + .endInput(); + + // Verify data was written in sorted order + List result = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(6, result.size()); + + List filteredResult = + result.stream().map(TestData::filterOutVariablesWithoutHudiMetadata).collect(Collectors.toList()); + + assertArrayEquals(expected.toArray(), filteredResult.toArray()); + } + + @Test + public void testLargeDrainSize() throws Exception { + // Set larger drain size to test batch draining + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 20L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 5); + + // Create test data + List inputData = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(30, actualData.size()); + } + + @Test + public void testInvalidDrainSizeZero() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 0); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidDrainSizeNegative() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, -5); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidBufferSizeZero() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 0L); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidBufferSizeNegative() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, -100L); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidSortKeysOnlyCommas() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, " , , "); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidSortKeysOnlyWhitespace() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, " "); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + private GenericRowData createRowData(String uuid, String name, int age, String timestamp, String partition) { + return GenericRowData.of(StringData.fromString(uuid), StringData.fromString(name), + age, TimestampData.fromTimestamp(Timestamp.valueOf(timestamp)), StringData.fromString(partition)); + } +} From 3cdd7c6dfbb34164cd915eb89f746db7a496381a Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Tue, 10 Mar 2026 02:35:10 -0700 Subject: [PATCH 2/9] Fix CI test failures in continuous sort implementation - Move config validation before super.open() to avoid IllegalStateException when Flink runtime context is not initialized (fixes 5 validation tests) - Add RecordComparator for full comparison fallback when normalized keys are equal, ensuring correct multi-field sort order (fixes 2 sort tests) - Store RowData reference in SortKey for RecordComparator access --- ...AppendWriteFunctionWithContinuousSort.java | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java index 1b3fb75eb88a3..114f862b4a220 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java @@ -30,7 +30,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.RecordComparator; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; @@ -78,6 +80,7 @@ public class AppendWriteFunctionWithContinuousSort extends AppendWriteFunctio // Sort key computation private transient NormalizedKeyComputer normalizedKeyComputer; + private transient RecordComparator recordComparator; private transient byte[] reusableKeyBuffer; private transient MemorySegment reusableKeySegment; private transient int normalizedKeySize; @@ -100,9 +103,7 @@ public AppendWriteFunctionWithContinuousSort(Configuration config, RowType rowTy @Override public void open(Configuration parameters) throws Exception { - super.open(parameters); - - // Validate configuration + // Validate configuration before calling super.open() which requires Flink runtime context if (maxCapacity <= 0) { throw new IllegalArgumentException( String.format("Buffer capacity must be positive, got: %d", maxCapacity)); @@ -129,21 +130,25 @@ public void open(Configuration parameters) throws Exception { String.format("Sort keys list is empty after parsing: '%s'", sortKeys)); } + super.open(parameters); + LOG.info("Initializing continuous sort with keys: {}", sortKeyList); - // Create sort code generator for normalized key computation + // Create sort code generator for normalized key computation and record comparison SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, sortKeyList.toArray(new String[0])); SortCodeGenerator codeGenerator = sortOperatorGen.createSortCodeGenerator(); GeneratedNormalizedKeyComputer generatedKeyComputer = codeGenerator.generateNormalizedKeyComputer("ContinuousSortKeyComputer"); + GeneratedRecordComparator generatedComparator = codeGenerator.generateRecordComparator("ContinuousSortComparator"); - // Instantiate code-generated normalizedKeyComputer - this.normalizedKeyComputer = generatedKeyComputer.newInstance(Thread.currentThread().getContextClassLoader()); + // Instantiate code-generated components + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + this.normalizedKeyComputer = generatedKeyComputer.newInstance(classLoader); + this.recordComparator = generatedComparator.newInstance(classLoader); this.normalizedKeySize = normalizedKeyComputer.getNumKeyBytes(); - // Initialize TreeMap with comparator that has access to normalizedKeyComputer - // Note: We wrap byte arrays in MemorySegments for comparison + // Initialize TreeMap with comparator that uses normalized keys for fast comparison + // and falls back to RecordComparator for full comparison when normalized keys are equal this.sortedRecords = new TreeMap<>((k1, k2) -> { - // Wrap byte arrays in MemorySegments for comparison MemorySegment seg1 = MemorySegmentFactory.wrap(k1.keyBytes); MemorySegment seg2 = MemorySegmentFactory.wrap(k2.keyBytes); @@ -151,12 +156,17 @@ public void open(Configuration parameters) throws Exception { if (cmp != 0) { return cmp; } + // Normalized keys are equal - use full record comparison for correct ordering + cmp = recordComparator.compare(k1.record, k2.record); + if (cmp != 0) { + return cmp; + } + // Records are equal by sort keys - use insertion order for stability return Long.compare(k1.insertionOrder, k2.insertionOrder); }); this.insertionSequence = 0L; // Allocate reusable on-heap buffer for computing keys - // Using heap memory to avoid off-heap memory leak this.reusableKeyBuffer = new byte[normalizedKeySize]; this.reusableKeySegment = MemorySegmentFactory.wrap(reusableKeyBuffer); @@ -190,7 +200,7 @@ public void processElement(T value, Context ctx, Collector out) throws normalizedKeyComputer.putKey(data, reusableKeySegment, 0); // Create sort key (copies the normalized key from reusable segment) - SortKey key = new SortKey(reusableKeySegment, normalizedKeySize, insertionSequence++); + SortKey key = new SortKey(reusableKeySegment, normalizedKeySize, data, insertionSequence++); // Store the original RowData sortedRecords.put(key, data); @@ -288,13 +298,16 @@ public void close() throws Exception { /** * Sort key with normalized key stored in byte array (on-heap). + * Holds a reference to the original record for full comparison fallback. * Comparison is done via TreeMap comparator. */ private static class SortKey { final byte[] keyBytes; + final RowData record; final long insertionOrder; - SortKey(MemorySegment sourceSegment, int keySize, long insertionOrder) { + SortKey(MemorySegment sourceSegment, int keySize, RowData record, long insertionOrder) { + this.record = record; this.insertionOrder = insertionOrder; // Copy normalized key from MemorySegment to on-heap byte array From a4085261407b9451e19565f5f1001f251dc0f7c7 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Wed, 18 Mar 2026 11:24:23 -0700 Subject: [PATCH 3/9] Address review comments: migrate to WRITE_BUFFER_TYPE enum, handle object reuse, cleanup - Replace WRITE_BUFFER_SORT_CONTINUOUS_ENABLED with WRITE_BUFFER_TYPE=CONTINUOUS_SORT - Add CONTINUOUS_SORT to BufferType enum - Use AppendWriteFunctions.resolveSortKeys() for sort key resolution with record key fallback - Copy RowData when Flink object reuse is enabled to prevent mutation - Move sortedRecords.clear()/insertionSequence reset inside if branches - Fix stale class name reference in javadoc - Update test to use new WRITE_BUFFER_TYPE config Co-Authored-By: Claude Opus 4.6 --- .../hudi/configuration/FlinkOptions.java | 12 +---- ...AppendWriteFunctionWithContinuousSort.java | 54 +++++++++---------- .../sink/append/AppendWriteFunctions.java | 9 ++-- .../apache/hudi/sink/buffer/BufferType.java | 6 ++- ...AppendWriteFunctionWithContinuousSort.java | 4 +- 5 files changed, 37 insertions(+), 48 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 9c96316de6438..261c6af7d1a8c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -724,15 +724,6 @@ public class FlinkOptions extends HoodieConfig { + "Data is sorted within the buffer configured by number of records or buffer size. " + "The order of entire written file is not guaranteed."); - @AdvancedConfig - public static final ConfigOption WRITE_BUFFER_SORT_CONTINUOUS_ENABLED = ConfigOptions - .key("write.buffer.sort.continuous.enabled") - .booleanType() - .defaultValue(false) // default use batch sorting - .withDescription("Whether to use continuous sorting (TreeMap-based) instead of batch sorting. " - + "Continuous sorting provides O(log n) inserts and incremental draining, " - + "but has higher per-record overhead. Requires write.buffer.sort.enabled=true."); - @AdvancedConfig public static final ConfigOption WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE = ConfigOptions .key("write.buffer.sort.continuous.drain.size") @@ -760,7 +751,8 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Buffer type for append write function: " + "NONE (no buffer sort, default), " + "BOUNDED_IN_MEMORY (double buffer with async write), " - + "DISRUPTOR (ring buffer with async write, recommended for better throughput)"); + + "DISRUPTOR (ring buffer with async write, recommended for better throughput), " + + "CONTINUOUS_SORT (TreeMap-based continuous sorting with incremental draining)"); @AdvancedConfig public static final ConfigOption WRITE_BUFFER_DISRUPTOR_RING_SIZE = ConfigOptions diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java index 114f862b4a220..2303d5cc80357 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java @@ -29,6 +29,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; @@ -50,7 +51,7 @@ /** * Sink function to write data with continuous sorting for improved compression. * - *

Unlike {@link AppendWriteFunctionWithBufferSort} which uses batch sorting, + *

Unlike {@link AppendWriteFunctionWithBIMBufferSort} which uses batch sorting, * this function maintains sorted order continuously using a TreeMap, providing: *

    *
  • Non-blocking inserts (O(log n) vs O(1) + periodic O(n log n))
  • @@ -81,9 +82,10 @@ public class AppendWriteFunctionWithContinuousSort extends AppendWriteFunctio // Sort key computation private transient NormalizedKeyComputer normalizedKeyComputer; private transient RecordComparator recordComparator; - private transient byte[] reusableKeyBuffer; private transient MemorySegment reusableKeySegment; private transient int normalizedKeySize; + private transient boolean objectReuseEnabled; + private transient RowDataSerializer rowDataSerializer; // Metrics private transient long totalDrainOperations; @@ -114,21 +116,8 @@ public void open(Configuration parameters) throws Exception { String.format("Drain size must be positive, got: %d", drainSize)); } - // Parse and validate sort keys - String sortKeys = config.get(FlinkOptions.WRITE_BUFFER_SORT_KEYS); - if (sortKeys == null || sortKeys.trim().isEmpty()) { - throw new IllegalArgumentException("Sort keys cannot be null or empty for continuous sort"); - } - - List sortKeyList = Arrays.stream(sortKeys.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()); - - if (sortKeyList.isEmpty()) { - throw new IllegalArgumentException( - String.format("Sort keys list is empty after parsing: '%s'", sortKeys)); - } + // Resolve sort keys, falling back to record key if not specified + List sortKeyList = AppendWriteFunctions.resolveSortKeys(config); super.open(parameters); @@ -167,9 +156,15 @@ public void open(Configuration parameters) throws Exception { this.insertionSequence = 0L; // Allocate reusable on-heap buffer for computing keys - this.reusableKeyBuffer = new byte[normalizedKeySize]; + byte[] reusableKeyBuffer = new byte[normalizedKeySize]; this.reusableKeySegment = MemorySegmentFactory.wrap(reusableKeyBuffer); + // Detect object reuse mode and create serializer for copying if needed + this.objectReuseEnabled = getRuntimeContext().isObjectReuseEnabled(); + if (this.objectReuseEnabled) { + this.rowDataSerializer = new RowDataSerializer(rowType); + } + // Initialize metrics this.totalDrainOperations = 0; this.totalDrainedRecords = 0; @@ -195,6 +190,11 @@ public void processElement(T value, Context ctx, Collector out) throws } } + // Copy RowData when object reuse is enabled to prevent mutation after insertion + if (objectReuseEnabled) { + data = rowDataSerializer.copy(data); + } + // Write to buffer (maintains sorted order) // Compute normalized key into reusable segment normalizedKeyComputer.putKey(data, reusableKeySegment, 0); @@ -202,7 +202,7 @@ public void processElement(T value, Context ctx, Collector out) throws // Create sort key (copies the normalized key from reusable segment) SortKey key = new SortKey(reusableKeySegment, normalizedKeySize, data, insertionSequence++); - // Store the original RowData + // Store the RowData sortedRecords.put(key, data); totalInserted++; @@ -245,16 +245,14 @@ private void drainRecords(int count) throws IOException { @Override public void snapshotState() { try { - // Drain all remaining records + // Drain all remaining records and reset for next checkpoint interval if (!sortedRecords.isEmpty()) { LOG.info("Snapshot: draining {} remaining records", sortedRecords.size()); drainRecords(sortedRecords.size()); + sortedRecords.clear(); + insertionSequence = 0L; } - // Reset for next checkpoint interval - sortedRecords.clear(); - insertionSequence = 0L; - LOG.info("Snapshot complete: total drained={}, operations={}", totalDrainedRecords, totalDrainOperations); @@ -268,16 +266,14 @@ public void snapshotState() { @Override public void endInput() { try { - // Drain all remaining records + // Drain all remaining records and clear buffer if (!sortedRecords.isEmpty()) { LOG.info("EndInput: draining {} remaining records", sortedRecords.size()); drainRecords(sortedRecords.size()); + sortedRecords.clear(); + insertionSequence = 0L; } - // Clear buffer and reset sequence - sortedRecords.clear(); - insertionSequence = 0L; - } catch (IOException e) { throw new HoodieIOException("Failed to drain buffer during endInput", e); } finally { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java index e709f85f0cadc..4829623607d95 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java @@ -50,13 +50,10 @@ public static AppendWriteFunction create(Configuration conf, RowType rowT return new AppendWriteFunctionWithRateLimit<>(rowType, conf); } - // Check if continuous sorting is enabled (requires WRITE_BUFFER_SORT_ENABLED or appropriate buffer type) - if (conf.get(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_ENABLED)) { - return new AppendWriteFunctionWithContinuousSort<>(conf, rowType); - } - String bufferType = resolveBufferType(conf); - if (BufferType.DISRUPTOR.name().equalsIgnoreCase(bufferType)) { + if (BufferType.CONTINUOUS_SORT.name().equalsIgnoreCase(bufferType)) { + return new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + } else if (BufferType.DISRUPTOR.name().equalsIgnoreCase(bufferType)) { return new AppendWriteFunctionWithDisruptorBufferSort<>(conf, rowType); } else if (BufferType.BOUNDED_IN_MEMORY.name().equalsIgnoreCase(bufferType)) { return new AppendWriteFunctionWithBIMBufferSort<>(conf, rowType); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.java index cbc779cb11ca8..7253bd789468e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.java @@ -36,5 +36,9 @@ public enum BufferType { @EnumFieldDescription("Lock-free ring buffer using LMAX Disruptor. Provides better throughput for high-volume " + "write operations by decoupling record ingestion from sorting and writing.") - DISRUPTOR + DISRUPTOR, + + @EnumFieldDescription("Continuous sorting using a TreeMap. Provides O(log n) inserts and incremental draining " + + "for predictable latency without sort spikes.") + CONTINUOUS_SORT } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java index 913987967a603..589a5eeaebcca 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java @@ -23,6 +23,7 @@ import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.buffer.BufferType; import org.apache.hudi.sink.utils.TestWriteBase; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -63,8 +64,7 @@ public class ITTestAppendWriteFunctionWithContinuousSort extends TestWriteBase { @BeforeEach public void before(@TempDir File tempDir) throws Exception { this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); - this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_ENABLED, true); - this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_ENABLED, true); + this.conf.set(FlinkOptions.WRITE_BUFFER_TYPE, BufferType.CONTINUOUS_SORT.name()); this.conf.set(FlinkOptions.OPERATION, "insert"); this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age"); this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 100L); From 847773a166f5184ed3f0635ba1a603a019ba689c Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Thu, 19 Mar 2026 16:48:01 -0700 Subject: [PATCH 4/9] fix(concurrency): detect rollback conflicts with ongoing commit operations Co-Authored-By: Claude Opus 4.6 --- .../transaction/ConcurrentOperation.java | 28 ++++- ...referWriterConflictResolutionStrategy.java | 15 ++- ...tFileWritesConflictResolutionStrategy.java | 40 ++++++- .../TestConflictResolutionStrategyUtil.java | 19 ++++ ...referWriterConflictResolutionStrategy.java | 107 ++++++++++++++++++ 5 files changed, 205 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java index dc3bd59c6211a..47842ccddab24 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java @@ -19,6 +19,7 @@ package org.apache.hudi.client.transaction; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieMetadataWrapper; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; @@ -47,6 +48,7 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.apache.hudi.common.util.CommitUtils.getPartitionAndFileIdWithoutSuffixFromSpecificRecord; /** @@ -68,8 +70,11 @@ public class ConcurrentOperation { private final String actionType; @ToString.Include private final String instantTime; + private final HoodieTableMetaClient metaClient; @Getter private Set> mutatedPartitionAndFileIds = Collections.emptySet(); + @Getter + private String rolledbackCommit; public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClient) throws IOException { // Replace inflight compaction and clustering to requested since inflight does not contain the plan. @@ -82,6 +87,7 @@ public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClie this.actionState = instant.getState().name(); this.actionType = instant.getAction(); this.instantTime = instant.requestedTime(); + this.metaClient = metaClient; // used only by the other concurrent operation (which reads from timeline) init(instant); } @@ -91,7 +97,13 @@ public ConcurrentOperation(HoodieInstant instant, HoodieCommitMetadata commitMet this.actionState = instant.getState().name(); this.actionType = instant.getAction(); this.instantTime = instant.requestedTime(); - init(instant); + this.metaClient = null; // used only by the other concurrent operation (which reads from timeline) + try { + init(instant); + } catch (IOException e) { + // This should never happen since we are initializing with commit metadata + throw new RuntimeException("Failed to initialize ConcurrentOperation for instant: " + instant, e); + } } public String getInstantActionState() { @@ -106,7 +118,7 @@ public String getInstantTimestamp() { return instantTime; } - private void init(HoodieInstant instant) { + private void init(HoodieInstant instant) throws IOException { if (this.metadataWrapper.isAvroMetadata()) { switch (getInstantActionType()) { case COMPACTION_ACTION: @@ -122,6 +134,18 @@ private void init(HoodieInstant instant) { .getPartitionToWriteStats()); this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType()); break; + case ROLLBACK_ACTION: + this.operationType = WriteOperationType.UNKNOWN; + if (!instant.isCompleted()) { + // requested rollback instants have rollback plan in the details; (inflight rollback is empty). + // irrespective of requested/inflight, always read rollback plan. + if (this.metaClient != null) { + HoodieInstant requested = metaClient.getInstantGenerator().getRollbackRequestedInstant(instant); + HoodieRollbackPlan rollbackPlan = metaClient.getActiveTimeline().readRollbackPlan(requested); + this.rolledbackCommit = rollbackPlan.getInstantToRollback().getCommitTime(); + } + } + break; case REPLACE_COMMIT_ACTION: case CLUSTERING_ACTION: if (instant.isCompleted()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java index 9b02e26aecc00..20ddca3bee361 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java @@ -54,12 +54,25 @@ public Stream getCandidateInstants(HoodieTableMetaClient metaClie HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline(); if (ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant, metaClient.getInstantGenerator()) || COMPACTION_ACTION.equals(currentInstant.getAction())) { + // Table service rollbacks are done by table service jobs/writers only, not by ingestion threads, + // so rollback conflict detection is not needed for table services. return getCandidateInstantsForTableServicesCommits(activeTimeline, currentInstant); } else { - return getCandidateInstantsForNonTableServicesCommits(activeTimeline, currentInstant); + return Stream.concat(getCandidateInstantsForNonTableServicesCommits(activeTimeline, currentInstant), + getCandidateInstantsForRollbackConflict(activeTimeline, currentInstant)); } } + private Stream getCandidateInstantsForRollbackConflict(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant) { + // Add Requested rollback action instants that were created after the current instant. + List pendingRollbacks = activeTimeline + .findInstantsAfter(currentInstant.requestedTime()) + .filterPendingRollbackTimeline() + .getInstantsAsStream().collect(Collectors.toList()); + log.info(String.format("Rollback instants that may have conflict with %s are %s", currentInstant, pendingRollbacks)); + return pendingRollbacks.stream(); + } + private Stream getCandidateInstantsForNonTableServicesCommits(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant) { // To find out which instants are conflicting, we apply the following logic diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java index 9445499ed8a9a..e2eaa53103036 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -40,6 +40,7 @@ import java.util.function.Predicate; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; @@ -130,6 +131,11 @@ private Predicate isClusteringOrRecentlyRequestedInstant(HoodieAc @Override public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { + // Check for rollback conflicts first + if (isRollbackConflict(thisOperation, otherOperation)) { + return true; + } + // TODO : UUID's can clash even for insert/insert, handle that case. Set> partitionAndFileIdsSetForFirstInstant = thisOperation.getMutatedPartitionAndFileIds(); Set> partitionAndFileIdsSetForSecondInstant = otherOperation.getMutatedPartitionAndFileIds(); @@ -143,6 +149,38 @@ public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperatio return false; } + /** + * Check whether there is a rollback operation in progress that tries to rollback the commit created by this + * operation. + * + * @param thisOperation first concurrent commit operation + * @param otherOperation concurrent rollback operation + * @return true if there is a rollback conflict, false otherwise + */ + private boolean isRollbackConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { + // Check if otherOperation is rollback + if (isRollbackOperation(otherOperation)) { + String rolledbackCommit = otherOperation.getRolledbackCommit(); + String thisCommitTimestamp = thisOperation.getInstantTimestamp(); + if (rolledbackCommit != null && rolledbackCommit.equals(thisCommitTimestamp)) { + log.error("Found rollback conflict: rollback operation " + otherOperation + + " is rolling back commit " + thisCommitTimestamp + " created by operation " + thisOperation); + return true; + } + } + return false; + } + + /** + * Check if the given operation is a rollback operation. + * + * @param operation concurrent operation to check + * @return true if it's a rollback operation, false otherwise + */ + private boolean isRollbackOperation(ConcurrentOperation operation) { + return ROLLBACK_ACTION.equals(operation.getInstantActionType()); + } + @Override public Option resolveConflict(HoodieTable table, ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { @@ -163,7 +201,7 @@ public Option resolveConflict(HoodieTable table, // Conflict arises only if the log compaction commit has a lesser timestamp compared to compaction commit. return thisOperation.getCommitMetadataOption(); } - // just abort the current write if conflicts are found + // just abort the current write if conflicts are found (failed for rollback conflicts). throw new HoodieWriteConflictException(new ConcurrentModificationException("Cannot resolve conflicts for overlapping writes between first operation = " + thisOperation + ", second operation = " + otherOperation)); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java index ea7ddac7aa4bc..fd6237b7e9ca6 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java @@ -22,7 +22,9 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSliceInfo; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; @@ -296,4 +298,21 @@ private static HoodieReplaceCommitMetadata createReplaceCommitMetadata(WriteOper replaceMetadata.setOperationType(writeOperationType); return replaceMetadata; } + + public static void createRollbackRequested(String rollbackInstantTime, String commitToRollback, HoodieTableMetaClient metaClient) throws Exception { + // Create a rollback plan that targets the specified commit + HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); + rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitToRollback, "commit")); + rollbackPlan.setVersion(TimelineLayoutVersion.CURR_VERSION); + + HoodieTestTable.of(metaClient).addRequestedRollback(rollbackInstantTime, rollbackPlan); + } + + public static void createRollbackInflight(String rollbackInstantTime, String commitToRollback, HoodieTableMetaClient metaClient) throws Exception { + // First create the requested rollback, then transition to inflight + createRollbackRequested(rollbackInstantTime, commitToRollback, metaClient); + + // Create the inflight rollback file + HoodieTestTable.of(metaClient).addInflightRollback(rollbackInstantTime); + } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java index 80561f74e14d4..f555633100322 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.Option; @@ -31,6 +32,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.List; @@ -45,6 +48,8 @@ import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterInflight; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterRequested; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRollbackInflight; +import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRollbackRequested; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; public class TestPreferWriterConflictResolutionStrategy extends HoodieCommonTestHarness { @@ -251,4 +256,106 @@ public void testConcurrentWritesWithInterleavingSuccessfulReplace() throws Excep // expected } } + + /** + * Positive testcase, ensures that conflict is flagged for an on-going rollback that is targetting the inflight commit. + * @param rollbackRequestedOnly - if true, cretes .rollback.requested only, otherwise creates .rollback.inflight + * @throws Exception + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConcurrentRollbackAndCommitConflict(boolean rollbackRequestedOnly) throws Exception { + // Create a base commit that the rollback will target + String targetCommitTime = WriteClientTestUtils.createNewInstantTime(); + createCommit(targetCommitTime, metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + + // Consider commits before this are all successful + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + + // Start a new commit (inflight ingestion commit) + String inflightCommitTime = WriteClientTestUtils.createNewInstantTime(); + createInflightCommit(inflightCommitTime, metaClient); + + // Start a rollback operation targeting the same commit timestamp as the inflight commit + String rollbackInstantTime = WriteClientTestUtils.createNewInstantTime(); + if (rollbackRequestedOnly) { + createRollbackRequested(rollbackInstantTime, inflightCommitTime, metaClient); + } else { + createRollbackInflight(rollbackInstantTime, inflightCommitTime, metaClient); + } + + // Set up the conflict resolution strategy + Option currentInstant = Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, inflightCommitTime)); + SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new PreferWriterConflictResolutionStrategy(); + HoodieCommitMetadata currentMetadata = createCommitMetadata(inflightCommitTime); + + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + + // The rollback operation should be detected as a candidate instant + Assertions.assertTrue(candidateInstants.size() == 1); + ConcurrentOperation rollbackOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); + ConcurrentOperation commitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); + + // The strategy should detect a conflict between the rollback and commit operations + Assertions.assertTrue(strategy.hasConflict(commitOperation, rollbackOperation)); + + // Attempting to resolve the conflict should throw an exception + try { + strategy.resolveConflict(null, commitOperation, rollbackOperation); + Assertions.fail("Cannot reach here, rollback and commit should have thrown a conflict"); + } catch (HoodieWriteConflictException e) { + // expected + } + } + + /** + * Negative testcase, ensures that conflict is not flagged for an on-going rollback that is targetting + * a different inflight commit. + * @param rollbackRequestedOnly - if true, cretes .rollback.requested only, otherwise creates .rollback.inflight + * @throws Exception + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConcurrentRollbackAndCommitNoConflict(boolean rollbackRequestedOnly) throws Exception { + // Create two different commits + String targetCommitTime = WriteClientTestUtils.createNewInstantTime(); + createCommit(targetCommitTime, metaClient); + String differentCommitTime = WriteClientTestUtils.createNewInstantTime(); + createCommit(differentCommitTime, metaClient); + + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + + // Start a new commit (inflight ingestion commit) + String inflightCommitTime = WriteClientTestUtils.createNewInstantTime(); + createInflightCommit(inflightCommitTime, metaClient); + + // Start a rollback operation targeting a different commit (not the inflight one) + String rollbackInstantTime = WriteClientTestUtils.createNewInstantTime(); + if (rollbackRequestedOnly) { + createRollbackRequested(rollbackInstantTime, targetCommitTime, metaClient); + } else { + createRollbackInflight(rollbackInstantTime, targetCommitTime, metaClient); + } + + // Set up the conflict resolution strategy + Option currentInstant = Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, inflightCommitTime)); + SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new PreferWriterConflictResolutionStrategy(); + HoodieCommitMetadata currentMetadata = createCommitMetadata(inflightCommitTime); + + metaClient.reloadActiveTimeline(); + List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( + Collectors.toList()); + + // The rollback operation should be detected as a candidate instant + Assertions.assertTrue(candidateInstants.size() == 1); + ConcurrentOperation rollbackOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); + ConcurrentOperation commitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); + + // The strategy should NOT detect a conflict since the rollback targets a different commit + Assertions.assertFalse(strategy.hasConflict(commitOperation, rollbackOperation)); + } } From d1d2d4f8325d936f82a4f88bc3ce5b9fb11ce1db Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Thu, 19 Mar 2026 16:54:59 -0700 Subject: [PATCH 5/9] Revert "fix(concurrency): detect rollback conflicts with ongoing commit operations" This reverts commit 847773a166f5184ed3f0635ba1a603a019ba689c. --- .../transaction/ConcurrentOperation.java | 28 +---- ...referWriterConflictResolutionStrategy.java | 15 +-- ...tFileWritesConflictResolutionStrategy.java | 40 +------ .../TestConflictResolutionStrategyUtil.java | 19 ---- ...referWriterConflictResolutionStrategy.java | 107 ------------------ 5 files changed, 4 insertions(+), 205 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java index 47842ccddab24..dc3bd59c6211a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java @@ -19,7 +19,6 @@ package org.apache.hudi.client.transaction; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; -import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieMetadataWrapper; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; @@ -48,7 +47,6 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.apache.hudi.common.util.CommitUtils.getPartitionAndFileIdWithoutSuffixFromSpecificRecord; /** @@ -70,11 +68,8 @@ public class ConcurrentOperation { private final String actionType; @ToString.Include private final String instantTime; - private final HoodieTableMetaClient metaClient; @Getter private Set> mutatedPartitionAndFileIds = Collections.emptySet(); - @Getter - private String rolledbackCommit; public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClient) throws IOException { // Replace inflight compaction and clustering to requested since inflight does not contain the plan. @@ -87,7 +82,6 @@ public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClie this.actionState = instant.getState().name(); this.actionType = instant.getAction(); this.instantTime = instant.requestedTime(); - this.metaClient = metaClient; // used only by the other concurrent operation (which reads from timeline) init(instant); } @@ -97,13 +91,7 @@ public ConcurrentOperation(HoodieInstant instant, HoodieCommitMetadata commitMet this.actionState = instant.getState().name(); this.actionType = instant.getAction(); this.instantTime = instant.requestedTime(); - this.metaClient = null; // used only by the other concurrent operation (which reads from timeline) - try { - init(instant); - } catch (IOException e) { - // This should never happen since we are initializing with commit metadata - throw new RuntimeException("Failed to initialize ConcurrentOperation for instant: " + instant, e); - } + init(instant); } public String getInstantActionState() { @@ -118,7 +106,7 @@ public String getInstantTimestamp() { return instantTime; } - private void init(HoodieInstant instant) throws IOException { + private void init(HoodieInstant instant) { if (this.metadataWrapper.isAvroMetadata()) { switch (getInstantActionType()) { case COMPACTION_ACTION: @@ -134,18 +122,6 @@ private void init(HoodieInstant instant) throws IOException { .getPartitionToWriteStats()); this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType()); break; - case ROLLBACK_ACTION: - this.operationType = WriteOperationType.UNKNOWN; - if (!instant.isCompleted()) { - // requested rollback instants have rollback plan in the details; (inflight rollback is empty). - // irrespective of requested/inflight, always read rollback plan. - if (this.metaClient != null) { - HoodieInstant requested = metaClient.getInstantGenerator().getRollbackRequestedInstant(instant); - HoodieRollbackPlan rollbackPlan = metaClient.getActiveTimeline().readRollbackPlan(requested); - this.rolledbackCommit = rollbackPlan.getInstantToRollback().getCommitTime(); - } - } - break; case REPLACE_COMMIT_ACTION: case CLUSTERING_ACTION: if (instant.isCompleted()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java index 20ddca3bee361..9b02e26aecc00 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java @@ -54,25 +54,12 @@ public Stream getCandidateInstants(HoodieTableMetaClient metaClie HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline(); if (ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant, metaClient.getInstantGenerator()) || COMPACTION_ACTION.equals(currentInstant.getAction())) { - // Table service rollbacks are done by table service jobs/writers only, not by ingestion threads, - // so rollback conflict detection is not needed for table services. return getCandidateInstantsForTableServicesCommits(activeTimeline, currentInstant); } else { - return Stream.concat(getCandidateInstantsForNonTableServicesCommits(activeTimeline, currentInstant), - getCandidateInstantsForRollbackConflict(activeTimeline, currentInstant)); + return getCandidateInstantsForNonTableServicesCommits(activeTimeline, currentInstant); } } - private Stream getCandidateInstantsForRollbackConflict(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant) { - // Add Requested rollback action instants that were created after the current instant. - List pendingRollbacks = activeTimeline - .findInstantsAfter(currentInstant.requestedTime()) - .filterPendingRollbackTimeline() - .getInstantsAsStream().collect(Collectors.toList()); - log.info(String.format("Rollback instants that may have conflict with %s are %s", currentInstant, pendingRollbacks)); - return pendingRollbacks.stream(); - } - private Stream getCandidateInstantsForNonTableServicesCommits(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant) { // To find out which instants are conflicting, we apply the following logic diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java index e2eaa53103036..9445499ed8a9a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -40,7 +40,6 @@ import java.util.function.Predicate; import java.util.stream.Stream; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; @@ -131,11 +130,6 @@ private Predicate isClusteringOrRecentlyRequestedInstant(HoodieAc @Override public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { - // Check for rollback conflicts first - if (isRollbackConflict(thisOperation, otherOperation)) { - return true; - } - // TODO : UUID's can clash even for insert/insert, handle that case. Set> partitionAndFileIdsSetForFirstInstant = thisOperation.getMutatedPartitionAndFileIds(); Set> partitionAndFileIdsSetForSecondInstant = otherOperation.getMutatedPartitionAndFileIds(); @@ -149,38 +143,6 @@ public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperatio return false; } - /** - * Check whether there is a rollback operation in progress that tries to rollback the commit created by this - * operation. - * - * @param thisOperation first concurrent commit operation - * @param otherOperation concurrent rollback operation - * @return true if there is a rollback conflict, false otherwise - */ - private boolean isRollbackConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { - // Check if otherOperation is rollback - if (isRollbackOperation(otherOperation)) { - String rolledbackCommit = otherOperation.getRolledbackCommit(); - String thisCommitTimestamp = thisOperation.getInstantTimestamp(); - if (rolledbackCommit != null && rolledbackCommit.equals(thisCommitTimestamp)) { - log.error("Found rollback conflict: rollback operation " + otherOperation - + " is rolling back commit " + thisCommitTimestamp + " created by operation " + thisOperation); - return true; - } - } - return false; - } - - /** - * Check if the given operation is a rollback operation. - * - * @param operation concurrent operation to check - * @return true if it's a rollback operation, false otherwise - */ - private boolean isRollbackOperation(ConcurrentOperation operation) { - return ROLLBACK_ACTION.equals(operation.getInstantActionType()); - } - @Override public Option resolveConflict(HoodieTable table, ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { @@ -201,7 +163,7 @@ public Option resolveConflict(HoodieTable table, // Conflict arises only if the log compaction commit has a lesser timestamp compared to compaction commit. return thisOperation.getCommitMetadataOption(); } - // just abort the current write if conflicts are found (failed for rollback conflicts). + // just abort the current write if conflicts are found throw new HoodieWriteConflictException(new ConcurrentModificationException("Cannot resolve conflicts for overlapping writes between first operation = " + thisOperation + ", second operation = " + otherOperation)); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java index fd6237b7e9ca6..ea7ddac7aa4bc 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java @@ -22,9 +22,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; -import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSliceInfo; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; @@ -298,21 +296,4 @@ private static HoodieReplaceCommitMetadata createReplaceCommitMetadata(WriteOper replaceMetadata.setOperationType(writeOperationType); return replaceMetadata; } - - public static void createRollbackRequested(String rollbackInstantTime, String commitToRollback, HoodieTableMetaClient metaClient) throws Exception { - // Create a rollback plan that targets the specified commit - HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); - rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitToRollback, "commit")); - rollbackPlan.setVersion(TimelineLayoutVersion.CURR_VERSION); - - HoodieTestTable.of(metaClient).addRequestedRollback(rollbackInstantTime, rollbackPlan); - } - - public static void createRollbackInflight(String rollbackInstantTime, String commitToRollback, HoodieTableMetaClient metaClient) throws Exception { - // First create the requested rollback, then transition to inflight - createRollbackRequested(rollbackInstantTime, commitToRollback, metaClient); - - // Create the inflight rollback file - HoodieTestTable.of(metaClient).addInflightRollback(rollbackInstantTime); - } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java index f555633100322..80561f74e14d4 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.Option; @@ -32,8 +31,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.List; @@ -48,8 +45,6 @@ import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterInflight; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterRequested; -import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRollbackInflight; -import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRollbackRequested; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; public class TestPreferWriterConflictResolutionStrategy extends HoodieCommonTestHarness { @@ -256,106 +251,4 @@ public void testConcurrentWritesWithInterleavingSuccessfulReplace() throws Excep // expected } } - - /** - * Positive testcase, ensures that conflict is flagged for an on-going rollback that is targetting the inflight commit. - * @param rollbackRequestedOnly - if true, cretes .rollback.requested only, otherwise creates .rollback.inflight - * @throws Exception - */ - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testConcurrentRollbackAndCommitConflict(boolean rollbackRequestedOnly) throws Exception { - // Create a base commit that the rollback will target - String targetCommitTime = WriteClientTestUtils.createNewInstantTime(); - createCommit(targetCommitTime, metaClient); - HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); - - // Consider commits before this are all successful - Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); - - // Start a new commit (inflight ingestion commit) - String inflightCommitTime = WriteClientTestUtils.createNewInstantTime(); - createInflightCommit(inflightCommitTime, metaClient); - - // Start a rollback operation targeting the same commit timestamp as the inflight commit - String rollbackInstantTime = WriteClientTestUtils.createNewInstantTime(); - if (rollbackRequestedOnly) { - createRollbackRequested(rollbackInstantTime, inflightCommitTime, metaClient); - } else { - createRollbackInflight(rollbackInstantTime, inflightCommitTime, metaClient); - } - - // Set up the conflict resolution strategy - Option currentInstant = Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, inflightCommitTime)); - SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new PreferWriterConflictResolutionStrategy(); - HoodieCommitMetadata currentMetadata = createCommitMetadata(inflightCommitTime); - - metaClient.reloadActiveTimeline(); - List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( - Collectors.toList()); - - // The rollback operation should be detected as a candidate instant - Assertions.assertTrue(candidateInstants.size() == 1); - ConcurrentOperation rollbackOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); - ConcurrentOperation commitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); - - // The strategy should detect a conflict between the rollback and commit operations - Assertions.assertTrue(strategy.hasConflict(commitOperation, rollbackOperation)); - - // Attempting to resolve the conflict should throw an exception - try { - strategy.resolveConflict(null, commitOperation, rollbackOperation); - Assertions.fail("Cannot reach here, rollback and commit should have thrown a conflict"); - } catch (HoodieWriteConflictException e) { - // expected - } - } - - /** - * Negative testcase, ensures that conflict is not flagged for an on-going rollback that is targetting - * a different inflight commit. - * @param rollbackRequestedOnly - if true, cretes .rollback.requested only, otherwise creates .rollback.inflight - * @throws Exception - */ - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testConcurrentRollbackAndCommitNoConflict(boolean rollbackRequestedOnly) throws Exception { - // Create two different commits - String targetCommitTime = WriteClientTestUtils.createNewInstantTime(); - createCommit(targetCommitTime, metaClient); - String differentCommitTime = WriteClientTestUtils.createNewInstantTime(); - createCommit(differentCommitTime, metaClient); - - HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); - Option lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); - - // Start a new commit (inflight ingestion commit) - String inflightCommitTime = WriteClientTestUtils.createNewInstantTime(); - createInflightCommit(inflightCommitTime, metaClient); - - // Start a rollback operation targeting a different commit (not the inflight one) - String rollbackInstantTime = WriteClientTestUtils.createNewInstantTime(); - if (rollbackRequestedOnly) { - createRollbackRequested(rollbackInstantTime, targetCommitTime, metaClient); - } else { - createRollbackInflight(rollbackInstantTime, targetCommitTime, metaClient); - } - - // Set up the conflict resolution strategy - Option currentInstant = Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, inflightCommitTime)); - SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new PreferWriterConflictResolutionStrategy(); - HoodieCommitMetadata currentMetadata = createCommitMetadata(inflightCommitTime); - - metaClient.reloadActiveTimeline(); - List candidateInstants = strategy.getCandidateInstants(metaClient, currentInstant.get(), lastSuccessfulInstant).collect( - Collectors.toList()); - - // The rollback operation should be detected as a candidate instant - Assertions.assertTrue(candidateInstants.size() == 1); - ConcurrentOperation rollbackOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); - ConcurrentOperation commitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); - - // The strategy should NOT detect a conflict since the rollback targets a different commit - Assertions.assertFalse(strategy.hasConflict(commitOperation, rollbackOperation)); - } } From e5921668dfe6aeba6ae085dfd9ae79371e144ae4 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Mon, 23 Mar 2026 22:49:01 -0700 Subject: [PATCH 6/9] Address review comments: fix CI, optimize comparator, add object reuse test - Fix isObjectReuseEnabled() compilation on Flink 1.17/1.18 by using getExecutionConfig().isObjectReuseEnabled() instead of RuntimeContext method - Remove unused imports (java.util.Arrays, java.util.stream.Collectors) to fix checkstyle violations - Move super.snapshotState() and super.endInput() out of finally clauses to match other append write functions (avoid sending write meta event on error) - Optimize TreeMap comparator: store pre-wrapped MemorySegment in SortKey to avoid per-comparison allocation and reduce GC pressure - Filter empty strings after trim in resolveSortKeys() to properly validate whitespace-only and comma-only sort key configs - Add testObjectReuseEnabled test that verifies records are correctly copied when Flink object reuse is enabled (prevents TreeMap corruption) Co-Authored-By: Claude Opus 4.6 --- ...AppendWriteFunctionWithContinuousSort.java | 25 ++++----- .../sink/append/AppendWriteFunctions.java | 7 ++- ...AppendWriteFunctionWithContinuousSort.java | 54 +++++++++++++++++++ .../sink/utils/InsertFunctionWrapper.java | 7 ++- .../utils/MockStreamingRuntimeContext.java | 24 ++++++++- .../apache/hudi/sink/utils/TestWriteBase.java | 11 ++++ .../java/org/apache/hudi/utils/TestData.java | 10 ++++ 7 files changed, 120 insertions(+), 18 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java index 2303d5cc80357..a0760ee8ce759 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java @@ -41,12 +41,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.stream.Collectors; /** * Sink function to write data with continuous sorting for improved compression. @@ -138,10 +136,7 @@ public void open(Configuration parameters) throws Exception { // Initialize TreeMap with comparator that uses normalized keys for fast comparison // and falls back to RecordComparator for full comparison when normalized keys are equal this.sortedRecords = new TreeMap<>((k1, k2) -> { - MemorySegment seg1 = MemorySegmentFactory.wrap(k1.keyBytes); - MemorySegment seg2 = MemorySegmentFactory.wrap(k2.keyBytes); - - int cmp = normalizedKeyComputer.compareKey(seg1, 0, seg2, 0); + int cmp = normalizedKeyComputer.compareKey(k1.keySegment, 0, k2.keySegment, 0); if (cmp != 0) { return cmp; } @@ -160,7 +155,7 @@ public void open(Configuration parameters) throws Exception { this.reusableKeySegment = MemorySegmentFactory.wrap(reusableKeyBuffer); // Detect object reuse mode and create serializer for copying if needed - this.objectReuseEnabled = getRuntimeContext().isObjectReuseEnabled(); + this.objectReuseEnabled = getRuntimeContext().getExecutionConfig().isObjectReuseEnabled(); if (this.objectReuseEnabled) { this.rowDataSerializer = new RowDataSerializer(rowType); } @@ -258,9 +253,8 @@ public void snapshotState() { } catch (IOException e) { throw new HoodieIOException("Failed to drain buffer during snapshot", e); - } finally { - super.snapshotState(); } + super.snapshotState(); } @Override @@ -276,9 +270,8 @@ public void endInput() { } catch (IOException e) { throw new HoodieIOException("Failed to drain buffer during endInput", e); - } finally { - super.endInput(); } + super.endInput(); } @Override @@ -293,12 +286,13 @@ public void close() throws Exception { } /** - * Sort key with normalized key stored in byte array (on-heap). + * Sort key with normalized key stored as a pre-wrapped MemorySegment to avoid + * repeated allocation during TreeMap comparisons. * Holds a reference to the original record for full comparison fallback. * Comparison is done via TreeMap comparator. */ private static class SortKey { - final byte[] keyBytes; + final MemorySegment keySegment; final RowData record; final long insertionOrder; @@ -306,9 +300,10 @@ private static class SortKey { this.record = record; this.insertionOrder = insertionOrder; - // Copy normalized key from MemorySegment to on-heap byte array - this.keyBytes = new byte[keySize]; + // Copy normalized key and wrap as MemorySegment once to avoid per-comparison allocation + byte[] keyBytes = new byte[keySize]; sourceSegment.get(0, keyBytes, 0, keySize); + this.keySegment = MemorySegmentFactory.wrap(keyBytes); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java index 4829623607d95..28325e4bfb349 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java @@ -99,8 +99,13 @@ public static List resolveSortKeys(Configuration conf) { ValidationUtils.checkArgument(StringUtils.nonEmpty(sortKeys), "Sort keys can't be null or empty for append write with buffer sort. " + "Either set write.buffer.sort.keys or ensure record key field is configured."); - return Arrays.stream(sortKeys.split(",")) + List sortKeyList = Arrays.stream(sortKeys.split(",")) .map(String::trim) + .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); + ValidationUtils.checkArgument(!sortKeyList.isEmpty(), + "Sort keys can't be empty for append write with buffer sort. " + + "Either set write.buffer.sort.keys or ensure record key field is configured."); + return sortKeyList; } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java index 589a5eeaebcca..dbe319c9b5b47 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java @@ -352,6 +352,60 @@ public void testInvalidSortKeysOnlyCommas() { }); } + @Test + public void testObjectReuseEnabled() throws Exception { + // All expected records (order may vary due to file read ordering) + List expected = Arrays.asList( + "uuid1,Bob,30,1970-01-01 00:00:01.123,p1", + "uuid2,Alice,25,1970-01-01 00:00:01.124,p1", + "uuid3,Bob,21,1970-01-01 00:00:31.124,p1"); + + // Create a reusable row to simulate Flink object reuse behavior + GenericRowData reusableRow = new GenericRowData(5); + + // Write data using a single reused RowData instance (mimicking object reuse) + TestWriteBase.TestHarness harness = TestWriteBase.TestHarness.instance() + .preparePipelineWithObjectReuse(tempFile, conf); + + // Record 1: Bob, 30 + reusableRow.setField(0, StringData.fromString("uuid1")); + reusableRow.setField(1, StringData.fromString("Bob")); + reusableRow.setField(2, 30); + reusableRow.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf("1970-01-01 00:00:01.123"))); + reusableRow.setField(4, StringData.fromString("p1")); + harness.consume(Arrays.asList(reusableRow)); + + // Record 2: Alice, 25 (mutating the same row) + reusableRow.setField(0, StringData.fromString("uuid2")); + reusableRow.setField(1, StringData.fromString("Alice")); + reusableRow.setField(2, 25); + reusableRow.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf("1970-01-01 00:00:01.124"))); + reusableRow.setField(4, StringData.fromString("p1")); + harness.consume(Arrays.asList(reusableRow)); + + // Record 3: Bob, 21 (mutating the same row again) + reusableRow.setField(0, StringData.fromString("uuid3")); + reusableRow.setField(1, StringData.fromString("Bob")); + reusableRow.setField(2, 21); + reusableRow.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf("1970-01-01 00:00:31.124"))); + reusableRow.setField(4, StringData.fromString("p1")); + harness.consume(Arrays.asList(reusableRow)); + + harness.checkpoint(1).endInput(); + + // Verify all 3 records are distinct and not corrupted by object reuse + // (without object reuse safety, all records would be the same - the last mutation) + List result = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(3, result.size()); + + List filteredResult = + result.stream().map(TestData::filterOutVariablesWithoutHudiMetadata) + .sorted().collect(Collectors.toList()); + List sortedExpected = expected.stream().sorted().collect(Collectors.toList()); + + assertArrayEquals(sortedExpected.toArray(), filteredResult.toArray()); + } + @Test public void testInvalidSortKeysOnlyWhitespace() { this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, " "); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java index dcbceab936bb7..8c0369a889c77 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -77,13 +77,18 @@ public class InsertFunctionWrapper implements TestFunctionWrapper { private AppendWriteFunction writeFunction; public InsertFunctionWrapper(String tablePath, Configuration conf) throws Exception { + this(tablePath, conf, new ExecutionConfig()); + } + + public InsertFunctionWrapper(String tablePath, Configuration conf, ExecutionConfig executionConfig) throws Exception { IOManager ioManager = new IOManagerAsync(); MockEnvironment environment = new MockEnvironmentBuilder() .setTaskName("mockTask") .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) .setIOManager(ioManager) + .setExecutionConfig(executionConfig) .build(); - this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); + this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment, executionConfig); this.gateway = new MockOperatorEventGateway(); this.subtaskGateway = new MockSubtaskGateway(); this.conf = conf; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java index 4a87466814b7a..0c88ad04e59f0 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java @@ -70,6 +70,19 @@ public MockStreamingRuntimeContext( this.taskInfo = new MockTaskInfo(numParallelSubtasks, subtaskIndex, 0); } + public MockStreamingRuntimeContext( + boolean isCheckpointingEnabled, + int numParallelSubtasks, + int subtaskIndex, + MockEnvironment environment, + ExecutionConfig executionConfig) { + + super(new MockStreamOperator(executionConfig), environment, new HashMap<>()); + + this.isCheckpointingEnabled = isCheckpointingEnabled; + this.taskInfo = new MockTaskInfo(numParallelSubtasks, subtaskIndex, 0); + } + public int getIndexOfThisSubtask() { return taskInfo.getIndexOfThisSubtask(); } @@ -90,14 +103,23 @@ private static class MockStreamOperator extends AbstractStreamOperator private static final long serialVersionUID = -1153976702711944427L; private transient TestProcessingTimeService testProcessingTimeService; + private final transient ExecutionConfig executionConfig; @Setter private transient Object currentKey; private final transient Map mockKeyedStateStoreMap = new HashMap<>(); + MockStreamOperator() { + this(new ExecutionConfig()); + } + + MockStreamOperator(ExecutionConfig executionConfig) { + this.executionConfig = executionConfig; + } + @Override public ExecutionConfig getExecutionConfig() { - return new ExecutionConfig(); + return executionConfig; } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index 4c7b051a5ba26..48aaf96440b38 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -186,6 +186,17 @@ public TestHarness preparePipeline(File basePath, Configuration conf) throws Exc return this; } + public TestHarness preparePipelineWithObjectReuse(File basePath, Configuration conf) throws Exception { + this.baseFile = basePath; + this.basePath = this.baseFile.getAbsolutePath(); + this.conf = conf; + this.pipeline = TestData.getWritePipelineWithObjectReuse(this.basePath, conf); + // open the function and ingest data + this.pipeline.openFunction(); + HoodieWriteConfig writeConfig = this.pipeline.getCoordinator().getWriteClient().getConfig(); + return this; + } + public TestHarness consume(List inputs) throws Exception { for (RowData rowData : inputs) { this.pipeline.invoke(rowData); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 4c4f1b30893cf..63a82f9e36c86 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -643,6 +643,16 @@ public static TestFunctionWrapper getWritePipeline(String basePath, Con } } + /** + * Initializes a writing pipeline with object reuse enabled. + */ + public static TestFunctionWrapper getWritePipelineWithObjectReuse( + String basePath, Configuration conf) throws Exception { + org.apache.flink.api.common.ExecutionConfig execConfig = new org.apache.flink.api.common.ExecutionConfig(); + execConfig.enableObjectReuse(); + return new InsertFunctionWrapper<>(basePath, conf, execConfig); + } + private static String toStringSafely(Object obj) { return obj == null ? "null" : obj.toString(); } From e6bc34dd9381dd0e5b3edd1cca06a5895d00fde5 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Wed, 25 Mar 2026 02:12:22 -0700 Subject: [PATCH 7/9] Fix Flink 2.0/2.1 compilation: use RuntimeContextUtils for isObjectReuseEnabled getExecutionConfig() was removed from RuntimeContext in Flink 2.0+. Add isObjectReuseEnabled() to version-specific RuntimeContextUtils adapters: - Flink 1.17-1.20: uses getExecutionConfig().isObjectReuseEnabled() - Flink 2.0-2.1: uses getGlobalJobParameters() with PipelineOptions.OBJECT_REUSE Co-Authored-By: Claude Opus 4.6 --- .../sink/append/AppendWriteFunctionWithContinuousSort.java | 3 ++- .../java/org/apache/hudi/utils/RuntimeContextUtils.java | 4 ++++ .../java/org/apache/hudi/utils/RuntimeContextUtils.java | 4 ++++ .../java/org/apache/hudi/utils/RuntimeContextUtils.java | 4 ++++ .../java/org/apache/hudi/utils/RuntimeContextUtils.java | 4 ++++ .../java/org/apache/hudi/utils/RuntimeContextUtils.java | 7 +++++++ .../java/org/apache/hudi/utils/RuntimeContextUtils.java | 7 +++++++ 7 files changed, 32 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java index a0760ee8ce759..66f704f3fd073 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java @@ -23,6 +23,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.utils.RuntimeContextUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; @@ -155,7 +156,7 @@ public void open(Configuration parameters) throws Exception { this.reusableKeySegment = MemorySegmentFactory.wrap(reusableKeyBuffer); // Detect object reuse mode and create serializer for copying if needed - this.objectReuseEnabled = getRuntimeContext().getExecutionConfig().isObjectReuseEnabled(); + this.objectReuseEnabled = RuntimeContextUtils.isObjectReuseEnabled(getRuntimeContext()); if (this.objectReuseEnabled) { this.rowDataSerializer = new RowDataSerializer(rowType); } diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index d76f37365d3d2..be75c77fd35a4 100644 --- a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -49,4 +49,8 @@ public static int getNumberOfParallelSubtasks(RuntimeContext runtimeContext) { public static long getWatermarkInternal(RuntimeContext runtimeContext) { return runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index d76f37365d3d2..be75c77fd35a4 100644 --- a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -49,4 +49,8 @@ public static int getNumberOfParallelSubtasks(RuntimeContext runtimeContext) { public static long getWatermarkInternal(RuntimeContext runtimeContext) { return runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index d76f37365d3d2..be75c77fd35a4 100644 --- a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -49,4 +49,8 @@ public static int getNumberOfParallelSubtasks(RuntimeContext runtimeContext) { public static long getWatermarkInternal(RuntimeContext runtimeContext) { return runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index d76f37365d3d2..be75c77fd35a4 100644 --- a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -49,4 +49,8 @@ public static int getNumberOfParallelSubtasks(RuntimeContext runtimeContext) { public static long getWatermarkInternal(RuntimeContext runtimeContext) { return runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index eb430e4c50127..61024715fe2f2 100644 --- a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -24,6 +24,7 @@ import java.util.Map; import static org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL; +import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; /** * Adapter utils for {@code RuntimeContext} to solve API compatibilities issue. @@ -55,4 +56,10 @@ public static long getWatermarkInternal(RuntimeContext runtimeContext) { return Long.parseLong(jobParameters.getOrDefault(AUTO_WATERMARK_INTERVAL.key(), AUTO_WATERMARK_INTERVAL.defaultValue().toMillis() + "")); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + Map jobParameters = runtimeContext.getGlobalJobParameters(); + return Boolean.parseBoolean(jobParameters.getOrDefault(OBJECT_REUSE.key(), + OBJECT_REUSE.defaultValue().toString())); + } } diff --git a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index eb430e4c50127..61024715fe2f2 100644 --- a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -24,6 +24,7 @@ import java.util.Map; import static org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL; +import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; /** * Adapter utils for {@code RuntimeContext} to solve API compatibilities issue. @@ -55,4 +56,10 @@ public static long getWatermarkInternal(RuntimeContext runtimeContext) { return Long.parseLong(jobParameters.getOrDefault(AUTO_WATERMARK_INTERVAL.key(), AUTO_WATERMARK_INTERVAL.defaultValue().toMillis() + "")); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + Map jobParameters = runtimeContext.getGlobalJobParameters(); + return Boolean.parseBoolean(jobParameters.getOrDefault(OBJECT_REUSE.key(), + OBJECT_REUSE.defaultValue().toString())); + } } From 1010cab90c8add56e8c7417ba4be2b19e5623b86 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Mon, 6 Apr 2026 14:57:12 -0700 Subject: [PATCH 8/9] Address review comments: use RuntimeContext.isObjectReuseEnabled() directly for Flink 2.0+/2.1+ Co-Authored-By: Claude Opus 4.6 --- .../main/java/org/apache/hudi/utils/RuntimeContextUtils.java | 5 +---- .../main/java/org/apache/hudi/utils/RuntimeContextUtils.java | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index 61024715fe2f2..f1bd7fdcc8e15 100644 --- a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -24,7 +24,6 @@ import java.util.Map; import static org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL; -import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; /** * Adapter utils for {@code RuntimeContext} to solve API compatibilities issue. @@ -58,8 +57,6 @@ public static long getWatermarkInternal(RuntimeContext runtimeContext) { } public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { - Map jobParameters = runtimeContext.getGlobalJobParameters(); - return Boolean.parseBoolean(jobParameters.getOrDefault(OBJECT_REUSE.key(), - OBJECT_REUSE.defaultValue().toString())); + return runtimeContext.isObjectReuseEnabled(); } } diff --git a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index 61024715fe2f2..f1bd7fdcc8e15 100644 --- a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -24,7 +24,6 @@ import java.util.Map; import static org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL; -import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; /** * Adapter utils for {@code RuntimeContext} to solve API compatibilities issue. @@ -58,8 +57,6 @@ public static long getWatermarkInternal(RuntimeContext runtimeContext) { } public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { - Map jobParameters = runtimeContext.getGlobalJobParameters(); - return Boolean.parseBoolean(jobParameters.getOrDefault(OBJECT_REUSE.key(), - OBJECT_REUSE.defaultValue().toString())); + return runtimeContext.isObjectReuseEnabled(); } } From 1caf9fa1b4581c353c2a4f0b5a33d419715c679b Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Wed, 15 Apr 2026 16:34:10 -0700 Subject: [PATCH 9/9] Address review comments: add memory bounding and optimize drain - Add TotalSizeTracer to bound buffer memory footprint using write.task.max.size, triggering drain when memory limit is exceeded in addition to record count limit - Replace Iterator-based drain loop with TreeMap.pollFirstEntry() to avoid iterator allocation per drain call (matters when drainSize=1) - Estimate record size once via ObjectSizeCalculator and track cumulative memory via TotalSizeTracer.trace()/countDown()/reset() Co-Authored-By: Claude Opus 4.6 --- ...AppendWriteFunctionWithContinuousSort.java | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java index 66f704f3fd073..caffecbb34ba3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java @@ -18,10 +18,12 @@ package org.apache.hudi.sink.append; +import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.buffer.TotalSizeTracer; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; import org.apache.hudi.utils.RuntimeContextUtils; @@ -42,7 +44,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -77,6 +78,7 @@ public class AppendWriteFunctionWithContinuousSort extends AppendWriteFunctio private transient TreeMap sortedRecords; private transient long insertionSequence; + private transient TotalSizeTracer sizeTracer; // Sort key computation private transient NormalizedKeyComputer normalizedKeyComputer; @@ -90,6 +92,7 @@ public class AppendWriteFunctionWithContinuousSort extends AppendWriteFunctio private transient long totalDrainOperations; private transient long totalDrainedRecords; private transient long totalInserted; + private transient long estimatedRecordSize; public AppendWriteFunctionWithContinuousSort(Configuration config, RowType rowType) { super(config, rowType); @@ -166,6 +169,9 @@ public void open(Configuration parameters) throws Exception { this.totalDrainedRecords = 0; this.totalInserted = 0; + // Initialize memory size tracer for bounding buffer memory footprint + this.sizeTracer = new TotalSizeTracer(config); + LOG.info("AppendWriteFunctionWithContinuousSort initialized successfully"); } @@ -173,8 +179,8 @@ public void open(Configuration parameters) throws Exception { public void processElement(T value, Context ctx, Collector out) throws Exception { RowData data = (RowData) value; - // Check if buffer has reached max capacity - if (sortedRecords.size() >= maxCapacity) { + // Check if buffer has reached max capacity (record count) or memory limit + if (sortedRecords.size() >= maxCapacity || sizeTracer.bufferSize > sizeTracer.maxBufferSize) { drainRecords(drainSize); // Verify there's space after draining @@ -198,8 +204,12 @@ public void processElement(T value, Context ctx, Collector out) throws // Create sort key (copies the normalized key from reusable segment) SortKey key = new SortKey(reusableKeySegment, normalizedKeySize, data, insertionSequence++); - // Store the RowData + // Store the RowData and track memory usage sortedRecords.put(key, data); + if (estimatedRecordSize == 0) { + estimatedRecordSize = ObjectSizeCalculator.getObjectSize(data); + } + sizeTracer.trace(estimatedRecordSize); totalInserted++; } @@ -217,25 +227,19 @@ private void drainRecords(int count) throws IOException { initWriterHelper(); } - // Drain records from TreeMap + // Drain records from TreeMap using pollFirstEntry() to avoid iterator allocation int actualCount = Math.min(count, sortedRecords.size()); int drained = 0; - Iterator> iterator = sortedRecords.entrySet().iterator(); - while (iterator.hasNext() && drained < actualCount) { - Map.Entry entry = iterator.next(); - RowData record = entry.getValue(); - - // Write record - writerHelper.write(record); - - // Remove from TreeMap - memory immediately reclaimed - iterator.remove(); + while (drained < actualCount && !sortedRecords.isEmpty()) { + Map.Entry entry = sortedRecords.pollFirstEntry(); + writerHelper.write(entry.getValue()); drained++; } totalDrainOperations++; totalDrainedRecords += drained; + sizeTracer.countDown(drained * estimatedRecordSize); } @Override @@ -247,6 +251,7 @@ public void snapshotState() { drainRecords(sortedRecords.size()); sortedRecords.clear(); insertionSequence = 0L; + sizeTracer.reset(); } LOG.info("Snapshot complete: total drained={}, operations={}", @@ -267,6 +272,7 @@ public void endInput() { drainRecords(sortedRecords.size()); sortedRecords.clear(); insertionSequence = 0L; + sizeTracer.reset(); } } catch (IOException e) {