diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 2e7c017225eea..261c6af7d1a8c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -724,6 +724,16 @@ public class FlinkOptions extends HoodieConfig { + "Data is sorted within the buffer configured by number of records or buffer size. " + "The order of entire written file is not guaranteed."); + @AdvancedConfig + public static final ConfigOption WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE = ConfigOptions + .key("write.buffer.sort.continuous.drain.size") + .intType() + .defaultValue(1) // default drain 1 record at a time + .withDescription("Number of records to drain each time the max capacity is reached when using continuous sorting. " + + "Default value of 1 provides smooth, incremental draining. " + + "Can be increased for batching if needed (e.g., 10, 100). " + + "Larger values reduce drain frequency but may cause latency spikes."); + @AdvancedConfig public static final ConfigOption WRITE_BUFFER_SIZE = ConfigOptions .key("write.buffer.size") @@ -741,7 +751,8 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Buffer type for append write function: " + "NONE (no buffer sort, default), " + "BOUNDED_IN_MEMORY (double buffer with async write), " - + "DISRUPTOR (ring buffer with async write, recommended for better throughput)"); + + "DISRUPTOR (ring buffer with async write, recommended for better throughput), " + + "CONTINUOUS_SORT (TreeMap-based continuous sorting with incremental draining)"); @AdvancedConfig public static final ConfigOption WRITE_BUFFER_DISRUPTOR_RING_SIZE = ConfigOptions diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java new file mode 100644 index 0000000000000..caffecbb34ba3 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithContinuousSort.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.hudi.common.util.ObjectSizeCalculator; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.buffer.TotalSizeTracer; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.utils.RuntimeContextUtils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * Sink function to write data with continuous sorting for improved compression. + * + *

Unlike {@link AppendWriteFunctionWithBIMBufferSort} which uses batch sorting, + * this function maintains sorted order continuously using a TreeMap, providing: + *

    + *
  • Non-blocking inserts (O(log n) vs O(1) + periodic O(n log n))
  • + *
  • Incremental draining without re-sorting
  • + *
  • Predictable latency (no sort spikes)
  • + *
+ * + *

Strategy: + *

    + *
  1. Records are inserted in sorted order (TreeMap)
  2. + *
  3. When buffer reaches max capacity, oldest record(s) are drained synchronously
  4. + *
  5. Drain size is configurable to balance latency vs. throughput vs compression ratio
  6. + *
+ * + * @param Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithContinuousSort extends AppendWriteFunction { + + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithContinuousSort.class); + + private final long maxCapacity; + private final int drainSize; + + private transient TreeMap sortedRecords; + private transient long insertionSequence; + private transient TotalSizeTracer sizeTracer; + + // Sort key computation + private transient NormalizedKeyComputer normalizedKeyComputer; + private transient RecordComparator recordComparator; + private transient MemorySegment reusableKeySegment; + private transient int normalizedKeySize; + private transient boolean objectReuseEnabled; + private transient RowDataSerializer rowDataSerializer; + + // Metrics + private transient long totalDrainOperations; + private transient long totalDrainedRecords; + private transient long totalInserted; + private transient long estimatedRecordSize; + + public AppendWriteFunctionWithContinuousSort(Configuration config, RowType rowType) { + super(config, rowType); + + // Configuration + this.maxCapacity = config.get(FlinkOptions.WRITE_BUFFER_SIZE); + this.drainSize = config.get(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE); + + LOG.info("AppendWriteFunctionWithContinuousSort created: maxCapacity={}, drainSize={}", + maxCapacity, drainSize); + } + + @Override + public void open(Configuration parameters) throws Exception { + // Validate configuration before calling super.open() which requires Flink runtime context + if (maxCapacity <= 0) { + throw new IllegalArgumentException( + String.format("Buffer capacity must be positive, got: %d", maxCapacity)); + } + + if (drainSize <= 0) { + throw new IllegalArgumentException( + String.format("Drain size must be positive, got: %d", drainSize)); + } + + // Resolve sort keys, falling back to record key if not specified + List sortKeyList = AppendWriteFunctions.resolveSortKeys(config); + + super.open(parameters); + + LOG.info("Initializing continuous sort with keys: {}", sortKeyList); + + // Create sort code generator for normalized key computation and record comparison + SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, sortKeyList.toArray(new String[0])); + SortCodeGenerator codeGenerator = sortOperatorGen.createSortCodeGenerator(); + GeneratedNormalizedKeyComputer generatedKeyComputer = codeGenerator.generateNormalizedKeyComputer("ContinuousSortKeyComputer"); + GeneratedRecordComparator generatedComparator = codeGenerator.generateRecordComparator("ContinuousSortComparator"); + + // Instantiate code-generated components + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + this.normalizedKeyComputer = generatedKeyComputer.newInstance(classLoader); + this.recordComparator = generatedComparator.newInstance(classLoader); + this.normalizedKeySize = normalizedKeyComputer.getNumKeyBytes(); + + // Initialize TreeMap with comparator that uses normalized keys for fast comparison + // and falls back to RecordComparator for full comparison when normalized keys are equal + this.sortedRecords = new TreeMap<>((k1, k2) -> { + int cmp = normalizedKeyComputer.compareKey(k1.keySegment, 0, k2.keySegment, 0); + if (cmp != 0) { + return cmp; + } + // Normalized keys are equal - use full record comparison for correct ordering + cmp = recordComparator.compare(k1.record, k2.record); + if (cmp != 0) { + return cmp; + } + // Records are equal by sort keys - use insertion order for stability + return Long.compare(k1.insertionOrder, k2.insertionOrder); + }); + this.insertionSequence = 0L; + + // Allocate reusable on-heap buffer for computing keys + byte[] reusableKeyBuffer = new byte[normalizedKeySize]; + this.reusableKeySegment = MemorySegmentFactory.wrap(reusableKeyBuffer); + + // Detect object reuse mode and create serializer for copying if needed + this.objectReuseEnabled = RuntimeContextUtils.isObjectReuseEnabled(getRuntimeContext()); + if (this.objectReuseEnabled) { + this.rowDataSerializer = new RowDataSerializer(rowType); + } + + // Initialize metrics + this.totalDrainOperations = 0; + this.totalDrainedRecords = 0; + this.totalInserted = 0; + + // Initialize memory size tracer for bounding buffer memory footprint + this.sizeTracer = new TotalSizeTracer(config); + + LOG.info("AppendWriteFunctionWithContinuousSort initialized successfully"); + } + + @Override + public void processElement(T value, Context ctx, Collector out) throws Exception { + RowData data = (RowData) value; + + // Check if buffer has reached max capacity (record count) or memory limit + if (sortedRecords.size() >= maxCapacity || sizeTracer.bufferSize > sizeTracer.maxBufferSize) { + drainRecords(drainSize); + + // Verify there's space after draining + if (sortedRecords.size() >= maxCapacity) { + throw new HoodieException( + String.format("Buffer cannot accept record after draining. " + + "Buffer size: %d, maxCapacity: %d, drainSize: %d", + sortedRecords.size(), maxCapacity, drainSize)); + } + } + + // Copy RowData when object reuse is enabled to prevent mutation after insertion + if (objectReuseEnabled) { + data = rowDataSerializer.copy(data); + } + + // Write to buffer (maintains sorted order) + // Compute normalized key into reusable segment + normalizedKeyComputer.putKey(data, reusableKeySegment, 0); + + // Create sort key (copies the normalized key from reusable segment) + SortKey key = new SortKey(reusableKeySegment, normalizedKeySize, data, insertionSequence++); + + // Store the RowData and track memory usage + sortedRecords.put(key, data); + if (estimatedRecordSize == 0) { + estimatedRecordSize = ObjectSizeCalculator.getObjectSize(data); + } + sizeTracer.trace(estimatedRecordSize); + + totalInserted++; + } + + /** + * Drain oldest records from buffer and write to storage. + */ + private void drainRecords(int count) throws IOException { + if (sortedRecords.isEmpty()) { + return; + } + + // Initialize writer if needed + if (this.writerHelper == null) { + initWriterHelper(); + } + + // Drain records from TreeMap using pollFirstEntry() to avoid iterator allocation + int actualCount = Math.min(count, sortedRecords.size()); + int drained = 0; + + while (drained < actualCount && !sortedRecords.isEmpty()) { + Map.Entry entry = sortedRecords.pollFirstEntry(); + writerHelper.write(entry.getValue()); + drained++; + } + + totalDrainOperations++; + totalDrainedRecords += drained; + sizeTracer.countDown(drained * estimatedRecordSize); + } + + @Override + public void snapshotState() { + try { + // Drain all remaining records and reset for next checkpoint interval + if (!sortedRecords.isEmpty()) { + LOG.info("Snapshot: draining {} remaining records", sortedRecords.size()); + drainRecords(sortedRecords.size()); + sortedRecords.clear(); + insertionSequence = 0L; + sizeTracer.reset(); + } + + LOG.info("Snapshot complete: total drained={}, operations={}", + totalDrainedRecords, totalDrainOperations); + + } catch (IOException e) { + throw new HoodieIOException("Failed to drain buffer during snapshot", e); + } + super.snapshotState(); + } + + @Override + public void endInput() { + try { + // Drain all remaining records and clear buffer + if (!sortedRecords.isEmpty()) { + LOG.info("EndInput: draining {} remaining records", sortedRecords.size()); + drainRecords(sortedRecords.size()); + sortedRecords.clear(); + insertionSequence = 0L; + sizeTracer.reset(); + } + + } catch (IOException e) { + throw new HoodieIOException("Failed to drain buffer during endInput", e); + } + super.endInput(); + } + + @Override + public void close() throws Exception { + try { + LOG.info("AppendWriteFunctionWithContinuousSort closed: totalInserted={}, totalDrained={}, operations={}", + totalInserted, totalDrainedRecords, totalDrainOperations); + + } finally { + super.close(); + } + } + + /** + * Sort key with normalized key stored as a pre-wrapped MemorySegment to avoid + * repeated allocation during TreeMap comparisons. + * Holds a reference to the original record for full comparison fallback. + * Comparison is done via TreeMap comparator. + */ + private static class SortKey { + final MemorySegment keySegment; + final RowData record; + final long insertionOrder; + + SortKey(MemorySegment sourceSegment, int keySize, RowData record, long insertionOrder) { + this.record = record; + this.insertionOrder = insertionOrder; + + // Copy normalized key and wrap as MemorySegment once to avoid per-comparison allocation + byte[] keyBytes = new byte[keySize]; + sourceSegment.get(0, keyBytes, 0, keySize); + this.keySegment = MemorySegmentFactory.wrap(keyBytes); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof SortKey)) { + return false; + } + return this.insertionOrder == ((SortKey) obj).insertionOrder; + } + + @Override + public int hashCode() { + return Long.hashCode(insertionOrder); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java index 39cda29ab4226..28325e4bfb349 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctions.java @@ -51,7 +51,9 @@ public static AppendWriteFunction create(Configuration conf, RowType rowT } String bufferType = resolveBufferType(conf); - if (BufferType.DISRUPTOR.name().equalsIgnoreCase(bufferType)) { + if (BufferType.CONTINUOUS_SORT.name().equalsIgnoreCase(bufferType)) { + return new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + } else if (BufferType.DISRUPTOR.name().equalsIgnoreCase(bufferType)) { return new AppendWriteFunctionWithDisruptorBufferSort<>(conf, rowType); } else if (BufferType.BOUNDED_IN_MEMORY.name().equalsIgnoreCase(bufferType)) { return new AppendWriteFunctionWithBIMBufferSort<>(conf, rowType); @@ -97,8 +99,13 @@ public static List resolveSortKeys(Configuration conf) { ValidationUtils.checkArgument(StringUtils.nonEmpty(sortKeys), "Sort keys can't be null or empty for append write with buffer sort. " + "Either set write.buffer.sort.keys or ensure record key field is configured."); - return Arrays.stream(sortKeys.split(",")) + List sortKeyList = Arrays.stream(sortKeys.split(",")) .map(String::trim) + .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); + ValidationUtils.checkArgument(!sortKeyList.isEmpty(), + "Sort keys can't be empty for append write with buffer sort. " + + "Either set write.buffer.sort.keys or ensure record key field is configured."); + return sortKeyList; } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.java index cbc779cb11ca8..7253bd789468e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferType.java @@ -36,5 +36,9 @@ public enum BufferType { @EnumFieldDescription("Lock-free ring buffer using LMAX Disruptor. Provides better throughput for high-volume " + "write operations by decoupling record ingestion from sorting and writing.") - DISRUPTOR + DISRUPTOR, + + @EnumFieldDescription("Continuous sorting using a TreeMap. Provides O(log n) inserts and incremental draining " + + "for predictable latency without sort spikes.") + CONTINUOUS_SORT } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java new file mode 100644 index 0000000000000..dbe319c9b5b47 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithContinuousSort.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.buffer.BufferType; +import org.apache.hudi.sink.utils.TestWriteBase; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test cases for {@link AppendWriteFunctionWithContinuousSort}. + */ +public class ITTestAppendWriteFunctionWithContinuousSort extends TestWriteBase { + private Configuration conf; + private RowType rowType; + + @TempDir + protected File tempFile; + + @BeforeEach + public void before(@TempDir File tempDir) throws Exception { + this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + this.conf.set(FlinkOptions.WRITE_BUFFER_TYPE, BufferType.CONTINUOUS_SORT.name()); + this.conf.set(FlinkOptions.OPERATION, "insert"); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age"); + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 100L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 1); + + // Define the row type with fields: name (STRING), age (INT), partition (STRING) + List fields = new ArrayList<>(); + fields.add(new RowType.RowField("uuid", VarCharType.STRING_TYPE)); + fields.add(new RowType.RowField("name", VarCharType.STRING_TYPE)); + fields.add(new RowType.RowField("age", new IntType())); + fields.add(new RowType.RowField("ts", new TimestampType())); + fields.add(new RowType.RowField("partition", VarCharType.STRING_TYPE)); + this.rowType = new RowType(fields); + } + + @Test + public void testBufferFlushOnRecordNumberLimit() throws Exception { + // Create test data that exceeds buffer size + List inputData = new ArrayList<>(); + for (int i = 0; i < 150; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(150, actualData.size()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testBufferFlush(boolean flushOnCheckpoint) throws Exception { + // Create test data + List inputData = Arrays.asList( + createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"), + createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1") + ); + + // Write the data and wait for timer + TestHarness testHarness = + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData); + if (flushOnCheckpoint) { + testHarness.checkpoint(1); + } else { + testHarness.endInput(); + } + + // Verify data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(2, actualData.size()); + } + + @Test + public void testBufferFlushOnBufferSizeLimit() throws Exception { + // enlarge the write buffer record size + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10000L); + // use a very small buffer memory size here + this.conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.1D); + + // Create test data that exceeds buffer size + List inputData = new ArrayList<>(); + for (int i = 0; i < 2000; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(2000, actualData.size()); + } + + @Test + public void testSortedResult() throws Exception { + // Create test data in unsorted order + List inputData = Arrays.asList( + createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"), + createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"), + createRowData("uuid3", "Bob", 21, "1970-01-01 00:00:31.124", "p1") + ); + + // Expected result after sorting by name, then age + List expected = Arrays.asList( + "uuid2,Alice,25,1970-01-01 00:00:01.124,p1", + "uuid3,Bob,21,1970-01-01 00:00:31.124,p1", + "uuid1,Bob,30,1970-01-01 00:00:01.123,p1"); + + // Write the data and wait for timer + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .checkpoint(1) + .endInput(); + + // Verify data was written + List result = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(3, result.size()); + + List filteredResult = + result.stream().map(TestData::filterOutVariablesWithoutHudiMetadata).collect(Collectors.toList()); + + assertArrayEquals(expected.toArray(), filteredResult.toArray()); + } + + @Test + public void testContinuousDrainBehavior() throws Exception { + // Set buffer size to 10 records + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 2); + + // Create test data that will trigger drain + List inputData = new ArrayList<>(); + for (int i = 0; i < 12; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data - should trigger continuous draining + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written despite buffer size limit + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(12, actualData.size()); + } + + @Test + public void testDrainSizeConfiguration() throws Exception { + // Set buffer size to 10 and drain size to 5 + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 5); + + // Create test data that will trigger multiple drains + List inputData = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data - should trigger draining in batches of 5 + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(20, actualData.size()); + } + + @Test + public void testSortedResultWithContinuousDrain() throws Exception { + // Set smaller buffer to force continuous draining + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 5L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 1); + + // Create test data with various names and ages + List inputData = Arrays.asList( + createRowData("uuid1", "Charlie", 35, "1970-01-01 00:00:01.123", "p1"), + createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"), + createRowData("uuid3", "Bob", 30, "1970-01-01 00:00:01.125", "p1"), + createRowData("uuid4", "Alice", 20, "1970-01-01 00:00:01.126", "p1"), + createRowData("uuid5", "Bob", 28, "1970-01-01 00:00:01.127", "p1"), + createRowData("uuid6", "Charlie", 40, "1970-01-01 00:00:01.128", "p1") + ); + + // Expected result after sorting by name, then age + List expected = Arrays.asList( + "uuid4,Alice,20,1970-01-01 00:00:01.126,p1", + "uuid2,Alice,25,1970-01-01 00:00:01.124,p1", + "uuid5,Bob,28,1970-01-01 00:00:01.127,p1", + "uuid3,Bob,30,1970-01-01 00:00:01.125,p1", + "uuid1,Charlie,35,1970-01-01 00:00:01.123,p1", + "uuid6,Charlie,40,1970-01-01 00:00:01.128,p1" + ); + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .checkpoint(1) + .endInput(); + + // Verify data was written in sorted order + List result = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(6, result.size()); + + List filteredResult = + result.stream().map(TestData::filterOutVariablesWithoutHudiMetadata).collect(Collectors.toList()); + + assertArrayEquals(expected.toArray(), filteredResult.toArray()); + } + + @Test + public void testLargeDrainSize() throws Exception { + // Set larger drain size to test batch draining + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 20L); + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 5); + + // Create test data + List inputData = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(30, actualData.size()); + } + + @Test + public void testInvalidDrainSizeZero() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, 0); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidDrainSizeNegative() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_CONTINUOUS_DRAIN_SIZE, -5); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidBufferSizeZero() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 0L); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidBufferSizeNegative() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, -100L); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testInvalidSortKeysOnlyCommas() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, " , , "); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + @Test + public void testObjectReuseEnabled() throws Exception { + // All expected records (order may vary due to file read ordering) + List expected = Arrays.asList( + "uuid1,Bob,30,1970-01-01 00:00:01.123,p1", + "uuid2,Alice,25,1970-01-01 00:00:01.124,p1", + "uuid3,Bob,21,1970-01-01 00:00:31.124,p1"); + + // Create a reusable row to simulate Flink object reuse behavior + GenericRowData reusableRow = new GenericRowData(5); + + // Write data using a single reused RowData instance (mimicking object reuse) + TestWriteBase.TestHarness harness = TestWriteBase.TestHarness.instance() + .preparePipelineWithObjectReuse(tempFile, conf); + + // Record 1: Bob, 30 + reusableRow.setField(0, StringData.fromString("uuid1")); + reusableRow.setField(1, StringData.fromString("Bob")); + reusableRow.setField(2, 30); + reusableRow.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf("1970-01-01 00:00:01.123"))); + reusableRow.setField(4, StringData.fromString("p1")); + harness.consume(Arrays.asList(reusableRow)); + + // Record 2: Alice, 25 (mutating the same row) + reusableRow.setField(0, StringData.fromString("uuid2")); + reusableRow.setField(1, StringData.fromString("Alice")); + reusableRow.setField(2, 25); + reusableRow.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf("1970-01-01 00:00:01.124"))); + reusableRow.setField(4, StringData.fromString("p1")); + harness.consume(Arrays.asList(reusableRow)); + + // Record 3: Bob, 21 (mutating the same row again) + reusableRow.setField(0, StringData.fromString("uuid3")); + reusableRow.setField(1, StringData.fromString("Bob")); + reusableRow.setField(2, 21); + reusableRow.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf("1970-01-01 00:00:31.124"))); + reusableRow.setField(4, StringData.fromString("p1")); + harness.consume(Arrays.asList(reusableRow)); + + harness.checkpoint(1).endInput(); + + // Verify all 3 records are distinct and not corrupted by object reuse + // (without object reuse safety, all records would be the same - the last mutation) + List result = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(3, result.size()); + + List filteredResult = + result.stream().map(TestData::filterOutVariablesWithoutHudiMetadata) + .sorted().collect(Collectors.toList()); + List sortedExpected = expected.stream().sorted().collect(Collectors.toList()); + + assertArrayEquals(sortedExpected.toArray(), filteredResult.toArray()); + } + + @Test + public void testInvalidSortKeysOnlyWhitespace() { + this.conf.set(FlinkOptions.WRITE_BUFFER_SORT_KEYS, " "); + + AppendWriteFunctionWithContinuousSort function = + new AppendWriteFunctionWithContinuousSort<>(conf, rowType); + + assertThrows(IllegalArgumentException.class, () -> { + function.open(conf); + }); + } + + private GenericRowData createRowData(String uuid, String name, int age, String timestamp, String partition) { + return GenericRowData.of(StringData.fromString(uuid), StringData.fromString(name), + age, TimestampData.fromTimestamp(Timestamp.valueOf(timestamp)), StringData.fromString(partition)); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java index dcbceab936bb7..8c0369a889c77 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -77,13 +77,18 @@ public class InsertFunctionWrapper implements TestFunctionWrapper { private AppendWriteFunction writeFunction; public InsertFunctionWrapper(String tablePath, Configuration conf) throws Exception { + this(tablePath, conf, new ExecutionConfig()); + } + + public InsertFunctionWrapper(String tablePath, Configuration conf, ExecutionConfig executionConfig) throws Exception { IOManager ioManager = new IOManagerAsync(); MockEnvironment environment = new MockEnvironmentBuilder() .setTaskName("mockTask") .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) .setIOManager(ioManager) + .setExecutionConfig(executionConfig) .build(); - this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); + this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment, executionConfig); this.gateway = new MockOperatorEventGateway(); this.subtaskGateway = new MockSubtaskGateway(); this.conf = conf; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java index 4a87466814b7a..0c88ad04e59f0 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java @@ -70,6 +70,19 @@ public MockStreamingRuntimeContext( this.taskInfo = new MockTaskInfo(numParallelSubtasks, subtaskIndex, 0); } + public MockStreamingRuntimeContext( + boolean isCheckpointingEnabled, + int numParallelSubtasks, + int subtaskIndex, + MockEnvironment environment, + ExecutionConfig executionConfig) { + + super(new MockStreamOperator(executionConfig), environment, new HashMap<>()); + + this.isCheckpointingEnabled = isCheckpointingEnabled; + this.taskInfo = new MockTaskInfo(numParallelSubtasks, subtaskIndex, 0); + } + public int getIndexOfThisSubtask() { return taskInfo.getIndexOfThisSubtask(); } @@ -90,14 +103,23 @@ private static class MockStreamOperator extends AbstractStreamOperator private static final long serialVersionUID = -1153976702711944427L; private transient TestProcessingTimeService testProcessingTimeService; + private final transient ExecutionConfig executionConfig; @Setter private transient Object currentKey; private final transient Map mockKeyedStateStoreMap = new HashMap<>(); + MockStreamOperator() { + this(new ExecutionConfig()); + } + + MockStreamOperator(ExecutionConfig executionConfig) { + this.executionConfig = executionConfig; + } + @Override public ExecutionConfig getExecutionConfig() { - return new ExecutionConfig(); + return executionConfig; } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index 4c7b051a5ba26..48aaf96440b38 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -186,6 +186,17 @@ public TestHarness preparePipeline(File basePath, Configuration conf) throws Exc return this; } + public TestHarness preparePipelineWithObjectReuse(File basePath, Configuration conf) throws Exception { + this.baseFile = basePath; + this.basePath = this.baseFile.getAbsolutePath(); + this.conf = conf; + this.pipeline = TestData.getWritePipelineWithObjectReuse(this.basePath, conf); + // open the function and ingest data + this.pipeline.openFunction(); + HoodieWriteConfig writeConfig = this.pipeline.getCoordinator().getWriteClient().getConfig(); + return this; + } + public TestHarness consume(List inputs) throws Exception { for (RowData rowData : inputs) { this.pipeline.invoke(rowData); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 4c4f1b30893cf..63a82f9e36c86 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -643,6 +643,16 @@ public static TestFunctionWrapper getWritePipeline(String basePath, Con } } + /** + * Initializes a writing pipeline with object reuse enabled. + */ + public static TestFunctionWrapper getWritePipelineWithObjectReuse( + String basePath, Configuration conf) throws Exception { + org.apache.flink.api.common.ExecutionConfig execConfig = new org.apache.flink.api.common.ExecutionConfig(); + execConfig.enableObjectReuse(); + return new InsertFunctionWrapper<>(basePath, conf, execConfig); + } + private static String toStringSafely(Object obj) { return obj == null ? "null" : obj.toString(); } diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index d76f37365d3d2..be75c77fd35a4 100644 --- a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -49,4 +49,8 @@ public static int getNumberOfParallelSubtasks(RuntimeContext runtimeContext) { public static long getWatermarkInternal(RuntimeContext runtimeContext) { return runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index d76f37365d3d2..be75c77fd35a4 100644 --- a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -49,4 +49,8 @@ public static int getNumberOfParallelSubtasks(RuntimeContext runtimeContext) { public static long getWatermarkInternal(RuntimeContext runtimeContext) { return runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index d76f37365d3d2..be75c77fd35a4 100644 --- a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -49,4 +49,8 @@ public static int getNumberOfParallelSubtasks(RuntimeContext runtimeContext) { public static long getWatermarkInternal(RuntimeContext runtimeContext) { return runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index d76f37365d3d2..be75c77fd35a4 100644 --- a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -49,4 +49,8 @@ public static int getNumberOfParallelSubtasks(RuntimeContext runtimeContext) { public static long getWatermarkInternal(RuntimeContext runtimeContext) { return runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index eb430e4c50127..f1bd7fdcc8e15 100644 --- a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -55,4 +55,8 @@ public static long getWatermarkInternal(RuntimeContext runtimeContext) { return Long.parseLong(jobParameters.getOrDefault(AUTO_WATERMARK_INTERVAL.key(), AUTO_WATERMARK_INTERVAL.defaultValue().toMillis() + "")); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.isObjectReuseEnabled(); + } } diff --git a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java index eb430e4c50127..f1bd7fdcc8e15 100644 --- a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java +++ b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/utils/RuntimeContextUtils.java @@ -55,4 +55,8 @@ public static long getWatermarkInternal(RuntimeContext runtimeContext) { return Long.parseLong(jobParameters.getOrDefault(AUTO_WATERMARK_INTERVAL.key(), AUTO_WATERMARK_INTERVAL.defaultValue().toMillis() + "")); } + + public static boolean isObjectReuseEnabled(RuntimeContext runtimeContext) { + return runtimeContext.isObjectReuseEnabled(); + } }