diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index f6ea3e137d1d..6160f74e2ff8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -71,6 +71,8 @@ public class MergeTreeCompactManager extends CompactFutureManager { @Nullable private final RecordLevelExpire recordLevelExpire; + private final boolean keepDelete; + public MergeTreeCompactManager( ExecutorService executor, Levels levels, @@ -84,7 +86,8 @@ public MergeTreeCompactManager( boolean lazyGenDeletionFile, boolean needLookup, @Nullable RecordLevelExpire recordLevelExpire, - boolean forceRewriteAllFiles) { + boolean forceRewriteAllFiles, + boolean keepDelete) { this.executor = executor; this.levels = levels; this.strategy = strategy; @@ -98,6 +101,7 @@ public MergeTreeCompactManager( this.recordLevelExpire = recordLevelExpire; this.needLookup = needLookup; this.forceRewriteAllFiles = forceRewriteAllFiles; + this.keepDelete = keepDelete; MetricUtils.safeCall(this::reportMetrics, LOG); } @@ -174,7 +178,8 @@ public void triggerCompaction(boolean fullCompaction) { * See CompactStrategy.pick. */ boolean dropDelete = - unit.outputLevel() != 0 + !keepDelete + && unit.outputLevel() != 0 && (unit.outputLevel() >= levels.nonEmptyHighestLevel() || dvMaintainer != null); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 177f093583a4..326158c7b757 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -308,7 +308,8 @@ private CompactManager createCompactManager( options.prepareCommitWaitCompaction(), options.needLookup(), recordLevelExpire, - options.forceRewriteAllFiles()); + options.forceRewriteAllFiles(), + options.isChainTable()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 11d3ede21352..9769e59e1714 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -455,6 +455,7 @@ private MergeTreeCompactManager createCompactManager( false, options.needLookup(), null, + false, false); } @@ -480,6 +481,7 @@ public MockFailResultCompactionManager( false, false, null, + false, false); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java index 4adff9477857..07406baac56d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java @@ -18,24 +18,54 @@ package org.apache.paimon.mergetree.compact; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.Snapshot; +import org.apache.paimon.TestFileStore; +import org.apache.paimon.TestKeyValueGenerator; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileTestUtils; +import org.apache.paimon.io.FileReaderFactory; +import org.apache.paimon.io.KeyValueFileReaderFactory; +import org.apache.paimon.io.KeyValueFileWriterFactory; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.mergetree.Levels; +import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.SortedRun; +import org.apache.paimon.operation.FileStoreScan; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.types.RowKind; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -43,6 +73,7 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; +import static org.apache.paimon.format.FileFormat.fileFormat; import static org.apache.paimon.io.DataFileTestUtils.newFile; import static org.assertj.core.api.Assertions.assertThat; @@ -53,6 +84,8 @@ public class MergeTreeCompactManagerTest { private static ExecutorService service; + @TempDir java.nio.file.Path tempDir; + @BeforeAll public static void before() { service = Executors.newSingleThreadExecutor(); @@ -209,6 +242,7 @@ public void testIsCompacting() { false, true, null, + false, false); MergeTreeCompactManager defaultManager = @@ -225,12 +259,242 @@ public void testIsCompacting() { false, false, null, + false, false); assertThat(lookupManager.compactNotCompleted()).isTrue(); assertThat(defaultManager.compactNotCompleted()).isFalse(); } + @Test + public void testCompactWithKeepDelete() throws Exception { + // 1) Build a minimal TestFileStore with primary key and one bucket. + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(new LocalFileIO(), new Path(tempDir.toUri())), + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.emptyMap(), + null)); + TestFileStore store = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + 1, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .build(); + + // 2) Write INSERT + DELETE for the same key into level-0 files. + int shopId = 1; + long orderId = 42L; + String dt = "20240101"; + int hr = 0; + + GenericRow keyRow = GenericRow.of(shopId, orderId); + GenericRow fullRow = + GenericRow.of( + BinaryString.fromString(dt), + hr, + shopId, + orderId, + 100L, + null, + BinaryString.fromString("comment")); + + BinaryRow keyBinaryRow = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(keyRow).copy(); + BinaryRow valueBinaryRow = + TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER.toBinaryRow(fullRow).copy(); + + InternalRowSerializer partitionSerializer = + new InternalRowSerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE); + BinaryRow partition = + partitionSerializer + .toBinaryRow(GenericRow.of(BinaryString.fromString(dt), hr)) + .copy(); + + KeyValue insert = new KeyValue().replace(keyBinaryRow, 0L, RowKind.INSERT, valueBinaryRow); + KeyValue delete = new KeyValue().replace(keyBinaryRow, 1L, RowKind.DELETE, valueBinaryRow); + + List kvs = Arrays.asList(insert, delete); + + List snapshots = + store.commitData(kvs, kv -> partition, kv -> 0 /* single bucket */); + Snapshot snapshot = snapshots.get(snapshots.size() - 1); + long snapshotId = snapshot.id(); + + // Collect input files from manifest entries of this snapshot. + FileStoreScan scan = store.newScan(); + List manifestEntries = scan.withSnapshot(snapshotId).plan().files(); + + List inputFiles = + manifestEntries.stream() + .filter(e -> e.kind() == FileKind.ADD) + .map(ManifestEntry::file) + .collect(Collectors.toList()); + + assertThat(inputFiles).isNotEmpty(); + + // 3) Create a MergeTreeCompactManager with keepDelete=true and run compaction. + Comparator keyComparator = + Comparator.comparingInt(r -> r.getInt(0)) + .thenComparingLong(r -> r.getLong(1)); + + CoreOptions coreOptions = store.options(); + Levels levels = new Levels(keyComparator, inputFiles, coreOptions.numLevels()); + + // Always compact all runs into the highest level. + CompactStrategy strategy = + (numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)); + + KeyValueFileReaderFactory.Builder readerFactoryBuilder = store.newReaderFactoryBuilder(); + KeyValueFileReaderFactory keyReaderFactory = + readerFactoryBuilder.build(partition, 0, DeletionVector.emptyFactory()); + FileReaderFactory readerFactory = keyReaderFactory; + + KeyValueFileWriterFactory.Builder writerFactoryBuilder = + KeyValueFileWriterFactory.builder( + store.fileIO(), + snapshot.schemaId(), + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + fileFormat(coreOptions), + ignore -> store.pathFactory(), + coreOptions.targetFileSize(true)); + KeyValueFileWriterFactory writerFactory = + writerFactoryBuilder.build(partition, 0, coreOptions); + + MergeSorter mergeSorter = + new MergeSorter( + coreOptions, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + null); + + MergeTreeCompactRewriter rewriter = + new MergeTreeCompactRewriter( + readerFactory, + writerFactory, + keyComparator, + null, + DeduplicateMergeFunction.factory(), + mergeSorter); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + MergeTreeCompactManager manager = + new MergeTreeCompactManager( + executor, + levels, + strategy, + keyComparator, + coreOptions.compactionFileSize(true), + coreOptions.numSortedRunStopTrigger(), + rewriter, + null, + null, + false, + false, + null, + false, + true); // keepDelete=true + + try { + manager.triggerCompaction(false); + Optional resultOptional = manager.getCompactionResult(true); + assertThat(resultOptional).isPresent(); + CompactResult compactResult = resultOptional.get(); + + List compactedFiles = compactResult.after(); + assertThat(compactedFiles).isNotEmpty(); + + int highestLevelBefore = + inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(0); + for (DataFileMeta file : compactedFiles) { + assertThat(file.level()).isNotZero(); + assertThat(file.level()).isGreaterThanOrEqualTo(highestLevelBefore); + } + + // 4) Read back from compacted files (via manifest entries) without DropDeleteReader + // and assert a DELETE record exists. + int totalBuckets = coreOptions.bucket(); + int bucket = 0; + List compactedEntries = + compactedFiles.stream() + .map( + file -> + ManifestEntry.create( + FileKind.ADD, + partition, + bucket, + totalBuckets, + file)) + .collect(Collectors.toList()); + + SplitRead read = store.newRead().forceKeepDelete(); + + Map>> filesPerPartitionAndBucket = + new HashMap<>(); + for (ManifestEntry entry : compactedEntries) { + filesPerPartitionAndBucket + .computeIfAbsent(entry.partition(), p -> new HashMap<>()) + .computeIfAbsent(entry.bucket(), b -> new ArrayList<>()) + .add(entry.file()); + } + + List readBack = new ArrayList<>(); + for (Map.Entry>> entry : + filesPerPartitionAndBucket.entrySet()) { + BinaryRow part = entry.getKey(); + for (Map.Entry> bucketEntry : + entry.getValue().entrySet()) { + RecordReader reader = + read.createReader( + DataSplit.builder() + .withPartition(part) + .withBucket(bucketEntry.getKey()) + .withDataFiles(bucketEntry.getValue()) + .isStreaming(false) + .rawConvertible(false) + .withBucketPath("not used") + .build()); + RecordReaderIterator iterator = new RecordReaderIterator<>(reader); + try { + while (iterator.hasNext()) { + readBack.add( + iterator.next() + .copy( + TestKeyValueGenerator.KEY_SERIALIZER, + TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER)); + } + } finally { + iterator.close(); + } + } + } + + List recordsForKey = + readBack.stream() + .filter( + kv -> + kv.key().getInt(0) == shopId + && kv.key().getLong(1) == orderId) + .collect(Collectors.toList()); + + assertThat(recordsForKey).isNotEmpty(); + assertThat(recordsForKey).allMatch(kv -> kv.valueKind() == RowKind.DELETE); + } finally { + manager.close(); + executor.shutdownNow(); + } + } + private void innerTest(List inputs, List expected) throws ExecutionException, InterruptedException { innerTest(inputs, expected, testStrategy(), true); @@ -262,6 +526,7 @@ private void innerTest( false, false, null, + false, false); manager.triggerCompaction(false); manager.getCompactionResult(true);