Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class MergeTreeCompactManager extends CompactFutureManager {

@Nullable private final RecordLevelExpire recordLevelExpire;

private final boolean keepDelete;

public MergeTreeCompactManager(
ExecutorService executor,
Levels levels,
Expand All @@ -84,7 +86,8 @@ public MergeTreeCompactManager(
boolean lazyGenDeletionFile,
boolean needLookup,
@Nullable RecordLevelExpire recordLevelExpire,
boolean forceRewriteAllFiles) {
boolean forceRewriteAllFiles,
boolean keepDelete) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
Expand All @@ -98,6 +101,7 @@ public MergeTreeCompactManager(
this.recordLevelExpire = recordLevelExpire;
this.needLookup = needLookup;
this.forceRewriteAllFiles = forceRewriteAllFiles;
this.keepDelete = keepDelete;

MetricUtils.safeCall(this::reportMetrics, LOG);
}
Expand Down Expand Up @@ -174,7 +178,8 @@ public void triggerCompaction(boolean fullCompaction) {
* See CompactStrategy.pick.
*/
boolean dropDelete =
unit.outputLevel() != 0
!keepDelete
&& unit.outputLevel() != 0
&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()
|| dvMaintainer != null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ private CompactManager createCompactManager(
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire,
options.forceRewriteAllFiles());
options.forceRewriteAllFiles(),
options.isChainTable());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ private MergeTreeCompactManager createCompactManager(
false,
options.needLookup(),
null,
false,
false);
}

Expand All @@ -480,6 +481,7 @@ public MockFailResultCompactionManager(
false,
false,
null,
false,
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,62 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileTestUtils;
import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowKind;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.paimon.format.FileFormat.fileFormat;
import static org.apache.paimon.io.DataFileTestUtils.newFile;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -53,6 +84,8 @@ public class MergeTreeCompactManagerTest {

private static ExecutorService service;

@TempDir java.nio.file.Path tempDir;

@BeforeAll
public static void before() {
service = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -209,6 +242,7 @@ public void testIsCompacting() {
false,
true,
null,
false,
false);

MergeTreeCompactManager defaultManager =
Expand All @@ -225,12 +259,242 @@ public void testIsCompacting() {
false,
false,
null,
false,
false);

assertThat(lookupManager.compactNotCompleted()).isTrue();
assertThat(defaultManager.compactNotCompleted()).isFalse();
}

@Test
public void testCompactWithKeepDelete() throws Exception {
// 1) Build a minimal TestFileStore with primary key and one bucket.
TableSchema tableSchema =
SchemaUtils.forceCommit(
new SchemaManager(new LocalFileIO(), new Path(tempDir.toUri())),
new Schema(
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
TestKeyValueGenerator.getPrimaryKeys(
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
Collections.emptyMap(),
null));
TestFileStore store =
new TestFileStore.Builder(
"avro",
tempDir.toString(),
1,
TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
DeduplicateMergeFunction.factory(),
tableSchema)
.build();

// 2) Write INSERT + DELETE for the same key into level-0 files.
int shopId = 1;
long orderId = 42L;
String dt = "20240101";
int hr = 0;

GenericRow keyRow = GenericRow.of(shopId, orderId);
GenericRow fullRow =
GenericRow.of(
BinaryString.fromString(dt),
hr,
shopId,
orderId,
100L,
null,
BinaryString.fromString("comment"));

BinaryRow keyBinaryRow = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(keyRow).copy();
BinaryRow valueBinaryRow =
TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER.toBinaryRow(fullRow).copy();

InternalRowSerializer partitionSerializer =
new InternalRowSerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
BinaryRow partition =
partitionSerializer
.toBinaryRow(GenericRow.of(BinaryString.fromString(dt), hr))
.copy();

KeyValue insert = new KeyValue().replace(keyBinaryRow, 0L, RowKind.INSERT, valueBinaryRow);
KeyValue delete = new KeyValue().replace(keyBinaryRow, 1L, RowKind.DELETE, valueBinaryRow);

List<KeyValue> kvs = Arrays.asList(insert, delete);

List<Snapshot> snapshots =
store.commitData(kvs, kv -> partition, kv -> 0 /* single bucket */);
Snapshot snapshot = snapshots.get(snapshots.size() - 1);
long snapshotId = snapshot.id();

// Collect input files from manifest entries of this snapshot.
FileStoreScan scan = store.newScan();
List<ManifestEntry> manifestEntries = scan.withSnapshot(snapshotId).plan().files();

List<DataFileMeta> inputFiles =
manifestEntries.stream()
.filter(e -> e.kind() == FileKind.ADD)
.map(ManifestEntry::file)
.collect(Collectors.toList());

assertThat(inputFiles).isNotEmpty();

// 3) Create a MergeTreeCompactManager with keepDelete=true and run compaction.
Comparator<InternalRow> keyComparator =
Comparator.<InternalRow>comparingInt(r -> r.getInt(0))
.thenComparingLong(r -> r.getLong(1));

CoreOptions coreOptions = store.options();
Levels levels = new Levels(keyComparator, inputFiles, coreOptions.numLevels());

// Always compact all runs into the highest level.
CompactStrategy strategy =
(numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs));

KeyValueFileReaderFactory.Builder readerFactoryBuilder = store.newReaderFactoryBuilder();
KeyValueFileReaderFactory keyReaderFactory =
readerFactoryBuilder.build(partition, 0, DeletionVector.emptyFactory());
FileReaderFactory<KeyValue> readerFactory = keyReaderFactory;

KeyValueFileWriterFactory.Builder writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
store.fileIO(),
snapshot.schemaId(),
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
fileFormat(coreOptions),
ignore -> store.pathFactory(),
coreOptions.targetFileSize(true));
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, 0, coreOptions);

MergeSorter mergeSorter =
new MergeSorter(
coreOptions,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
null);

MergeTreeCompactRewriter rewriter =
new MergeTreeCompactRewriter(
readerFactory,
writerFactory,
keyComparator,
null,
DeduplicateMergeFunction.factory(),
mergeSorter);

ExecutorService executor = Executors.newSingleThreadExecutor();
MergeTreeCompactManager manager =
new MergeTreeCompactManager(
executor,
levels,
strategy,
keyComparator,
coreOptions.compactionFileSize(true),
coreOptions.numSortedRunStopTrigger(),
rewriter,
null,
null,
false,
false,
null,
false,
true); // keepDelete=true

try {
manager.triggerCompaction(false);
Optional<CompactResult> resultOptional = manager.getCompactionResult(true);
assertThat(resultOptional).isPresent();
CompactResult compactResult = resultOptional.get();

List<DataFileMeta> compactedFiles = compactResult.after();
assertThat(compactedFiles).isNotEmpty();

int highestLevelBefore =
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(0);
for (DataFileMeta file : compactedFiles) {
assertThat(file.level()).isNotZero();
assertThat(file.level()).isGreaterThanOrEqualTo(highestLevelBefore);
}

// 4) Read back from compacted files (via manifest entries) without DropDeleteReader
// and assert a DELETE record exists.
int totalBuckets = coreOptions.bucket();
int bucket = 0;
List<ManifestEntry> compactedEntries =
compactedFiles.stream()
.map(
file ->
ManifestEntry.create(
FileKind.ADD,
partition,
bucket,
totalBuckets,
file))
.collect(Collectors.toList());

SplitRead<KeyValue> read = store.newRead().forceKeepDelete();

Map<BinaryRow, Map<Integer, List<DataFileMeta>>> filesPerPartitionAndBucket =
new HashMap<>();
for (ManifestEntry entry : compactedEntries) {
filesPerPartitionAndBucket
.computeIfAbsent(entry.partition(), p -> new HashMap<>())
.computeIfAbsent(entry.bucket(), b -> new ArrayList<>())
.add(entry.file());
}

List<KeyValue> readBack = new ArrayList<>();
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
filesPerPartitionAndBucket.entrySet()) {
BinaryRow part = entry.getKey();
for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
entry.getValue().entrySet()) {
RecordReader<KeyValue> reader =
read.createReader(
DataSplit.builder()
.withPartition(part)
.withBucket(bucketEntry.getKey())
.withDataFiles(bucketEntry.getValue())
.isStreaming(false)
.rawConvertible(false)
.withBucketPath("not used")
.build());
RecordReaderIterator<KeyValue> iterator = new RecordReaderIterator<>(reader);
try {
while (iterator.hasNext()) {
readBack.add(
iterator.next()
.copy(
TestKeyValueGenerator.KEY_SERIALIZER,
TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER));
}
} finally {
iterator.close();
}
}
}

List<KeyValue> recordsForKey =
readBack.stream()
.filter(
kv ->
kv.key().getInt(0) == shopId
&& kv.key().getLong(1) == orderId)
.collect(Collectors.toList());

assertThat(recordsForKey).isNotEmpty();
assertThat(recordsForKey).allMatch(kv -> kv.valueKind() == RowKind.DELETE);
} finally {
manager.close();
executor.shutdownNow();
}
}

private void innerTest(List<LevelMinMax> inputs, List<LevelMinMax> expected)
throws ExecutionException, InterruptedException {
innerTest(inputs, expected, testStrategy(), true);
Expand Down Expand Up @@ -262,6 +526,7 @@ private void innerTest(
false,
false,
null,
false,
false);
manager.triggerCompaction(false);
manager.getCompactionResult(true);
Expand Down