diff --git a/docs/content/maintenance/manage-snapshots.md b/docs/content/maintenance/manage-snapshots.md index 242c966d99b8..1ec553978672 100644 --- a/docs/content/maintenance/manage-snapshots.md +++ b/docs/content/maintenance/manage-snapshots.md @@ -263,6 +263,7 @@ Run the following command: --max_deletes \ --retain_max \ --retain_min \ + [--parallelism ] \ [--catalog_conf [--catalog_conf ...]] ``` diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 98a761b47166..7b0ef2e1205e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -132,10 +132,29 @@ public void cleanEmptyDirectories() { if (!cleanEmptyDirectories || deletionBuckets.isEmpty()) { return; } + doCleanEmptyDirectories(deletionBuckets); + deletionBuckets.clear(); + } + /** + * Try to delete data directories that may be empty after data file deletion. + * + *

This method uses externally provided buckets instead of internal deletionBuckets. Used in + * parallel expire mode where buckets are aggregated from multiple workers. + * + * @param aggregatedBuckets merged deletion buckets from all workers + */ + public void cleanEmptyDirectories(Map> aggregatedBuckets) { + if (!cleanEmptyDirectories || aggregatedBuckets.isEmpty()) { + return; + } + doCleanEmptyDirectories(aggregatedBuckets); + } + + private void doCleanEmptyDirectories(Map> buckets) { // All directory paths are deduplicated and sorted by hierarchy level Map> deduplicate = new HashMap<>(); - for (Map.Entry> entry : deletionBuckets.entrySet()) { + for (Map.Entry> entry : buckets.entrySet()) { List toDeleteEmptyDirectory = new ArrayList<>(); // try to delete bucket directories for (Integer bucket : entry.getValue()) { @@ -162,8 +181,6 @@ public void cleanEmptyDirectories() { for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0; hierarchy--) { deduplicate.get(hierarchy).forEach(this::tryDeleteEmptyDirectory); } - - deletionBuckets.clear(); } protected void recordDeletionBuckets(ExpireFileEntry entry) { @@ -172,6 +189,23 @@ protected void recordDeletionBuckets(ExpireFileEntry entry) { .add(entry.bucket()); } + /** + * Get and clear the deletion buckets. + * + *

This method is used in parallel expire to collect buckets for a single task without + * accumulating results from previous tasks processed by the same worker. + * + * @return a copy of the deletion buckets, the internal state is cleared after this call + */ + public Map> drainDeletionBuckets() { + Map> result = new HashMap<>(); + for (Map.Entry> entry : deletionBuckets.entrySet()) { + result.put(entry.getKey(), new HashSet<>(entry.getValue())); + } + deletionBuckets.clear(); + return result; + } + public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { // try read manifests List manifests = tryReadManifestList(manifestList); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/expire/DeletionReport.java b/paimon-core/src/main/java/org/apache/paimon/operation/expire/DeletionReport.java new file mode 100644 index 000000000000..5454c7e0216e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/expire/DeletionReport.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import org.apache.paimon.data.BinaryRow; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** Report of a single snapshot expiration task. */ +public class DeletionReport implements Serializable { + + private static final long serialVersionUID = 1L; + + private final long snapshotId; + + /** Whether this task was skipped (e.g., snapshot already deleted). */ + private boolean skipped; + + /** Whether data files were deleted. */ + private boolean dataFilesDeleted; + + /** Whether changelog data files were deleted. */ + private boolean changelogDeleted; + + /** Whether manifest files were deleted. */ + private boolean manifestsDeleted; + + /** Whether snapshot metadata file was deleted. */ + private boolean snapshotDeleted; + + /** Buckets that had files deleted (for empty directory cleanup in parallel phase). */ + private Map> deletionBuckets; + + public DeletionReport(long snapshotId) { + this.snapshotId = snapshotId; + this.skipped = false; + this.dataFilesDeleted = false; + this.changelogDeleted = false; + this.manifestsDeleted = false; + this.snapshotDeleted = false; + this.deletionBuckets = new HashMap<>(); + } + + /** + * Create a skipped report for a snapshot that was already deleted. + * + * @param snapshotId the snapshot ID + * @return a skipped deletion report + */ + public static DeletionReport skipped(long snapshotId) { + DeletionReport report = new DeletionReport(snapshotId); + report.skipped = true; + return report; + } + + public long snapshotId() { + return snapshotId; + } + + public boolean isSkipped() { + return skipped; + } + + public void setDataFilesDeleted(boolean dataFilesDeleted) { + this.dataFilesDeleted = dataFilesDeleted; + } + + public void setChangelogDeleted(boolean changelogDeleted) { + this.changelogDeleted = changelogDeleted; + } + + public void setManifestsDeleted(boolean manifestsDeleted) { + this.manifestsDeleted = manifestsDeleted; + } + + public void setSnapshotDeleted(boolean snapshotDeleted) { + this.snapshotDeleted = snapshotDeleted; + } + + public void setDeletionBuckets(Map> deletionBuckets) { + this.deletionBuckets = deletionBuckets; + } + + public Map> deletionBuckets() { + return deletionBuckets; + } + + @Override + public String toString() { + return "DeletionReport{" + + "snapshotId=" + + snapshotId + + ", skipped=" + + skipped + + ", dataFilesDeleted=" + + dataFilesDeleted + + ", changelogDeleted=" + + changelogDeleted + + ", manifestsDeleted=" + + manifestsDeleted + + ", snapshotDeleted=" + + snapshotDeleted + + ", deletionBucketsCount=" + + deletionBuckets.size() + + '}'; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsExecutor.java b/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsExecutor.java new file mode 100644 index 000000000000..f48b7acaa702 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsExecutor.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import org.apache.paimon.Changelog; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.ExpireFileEntry; +import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.SnapshotManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** + * Executor for snapshot expire tasks. This class handles the actual deletion logic based on the + * task type defined by {@link SnapshotExpireTask.TaskType}. + * + *

The executor uses switch-based dispatch on task type: + * + *

    + *
  • {@link SnapshotExpireTask.TaskType#DELETE_DATA_FILES} → delete data files + *
  • {@link SnapshotExpireTask.TaskType#DELETE_CHANGELOG_FILES} → delete changelog files + *
  • {@link SnapshotExpireTask.TaskType#DELETE_MANIFESTS} → delete manifest files + *
  • {@link SnapshotExpireTask.TaskType#DELETE_SNAPSHOT} → delete snapshot metadata file + *
+ */ +public class ExpireSnapshotsExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsExecutor.class); + + private final SnapshotManager snapshotManager; + private final SnapshotDeletion snapshotDeletion; + @Nullable private final ChangelogManager changelogManager; + + public ExpireSnapshotsExecutor( + SnapshotManager snapshotManager, SnapshotDeletion snapshotDeletion) { + this(snapshotManager, snapshotDeletion, null); + } + + public ExpireSnapshotsExecutor( + SnapshotManager snapshotManager, + SnapshotDeletion snapshotDeletion, + @Nullable ChangelogManager changelogManager) { + this.snapshotManager = snapshotManager; + this.snapshotDeletion = snapshotDeletion; + this.changelogManager = changelogManager; + } + + /** + * Execute a snapshot expire task based on its task type. + * + * @param task the task to execute + * @param taggedSnapshots taggedSnapshots set for data file deletion (required for + * DELETE_DATA_FILES) + * @param skippingSet manifest skipping set (required for DELETE_MANIFESTS) + * @return deletion report with execution results + */ + public DeletionReport execute( + SnapshotExpireTask task, + @Nullable List taggedSnapshots, + @Nullable Set skippingSet) { + + Snapshot snapshot; + try { + snapshot = snapshotManager.tryGetSnapshot(task.snapshotId()); + } catch (FileNotFoundException e) { + LOG.warn("Snapshot {} not found, skipping task", task.snapshotId()); + return DeletionReport.skipped(task.snapshotId()); + } + + switch (task.taskType()) { + case DELETE_DATA_FILES: + return executeDeleteDataFiles(task, snapshot, taggedSnapshots); + case DELETE_CHANGELOG_FILES: + return executeDeleteChangelogFiles(task, snapshot); + case DELETE_MANIFESTS: + return executeDeleteManifests(task, snapshot, skippingSet); + case DELETE_SNAPSHOT: + return executeDeleteSnapshot(task, snapshot); + default: + throw new IllegalArgumentException("Unknown task type: " + task.taskType()); + } + } + + private DeletionReport executeDeleteDataFiles( + SnapshotExpireTask task, Snapshot snapshot, List taggedSnapshots) { + checkNotNull(taggedSnapshots); + + // expire merge tree files and collect changed buckets + Predicate skipper = null; + try { + skipper = + snapshotDeletion.createDataFileSkipperForTags( + taggedSnapshots, task.snapshotId()); + } catch (Exception e) { + LOG.warn( + String.format( + "Skip cleaning data files of snapshot '%s' due to failed to build skipping set.", + task.snapshotId()), + e); + return DeletionReport.skipped(task.snapshotId()); + } + + if (skipper != null) { + snapshotDeletion.cleanUnusedDataFiles(snapshot, skipper); + } + DeletionReport report = new DeletionReport(task.snapshotId()); + report.setDataFilesDeleted(true); + + return report; + } + + private DeletionReport executeDeleteChangelogFiles(SnapshotExpireTask task, Snapshot snapshot) { + + DeletionReport report = new DeletionReport(task.snapshotId()); + + if (snapshot.changelogManifestList() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ready to delete changelog files from snapshot #" + task.snapshotId()); + } + snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList()); + report.setChangelogDeleted(true); + } + + return report; + } + + private DeletionReport executeDeleteManifests( + SnapshotExpireTask task, Snapshot snapshot, Set skippingSet) { + checkNotNull(skippingSet); + + DeletionReport report = new DeletionReport(task.snapshotId()); + + snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet); + report.setManifestsDeleted(true); + + return report; + } + + private DeletionReport executeDeleteSnapshot(SnapshotExpireTask task, Snapshot snapshot) { + DeletionReport report = new DeletionReport(task.snapshotId()); + + // 1. Commit changelog before deleting snapshot + if (task.isChangelogDecoupled() && changelogManager != null) { + commitChangelog(new Changelog(snapshot)); + } + + // 2. Delete snapshot metadata file + snapshotManager.deleteSnapshot(snapshot.id()); + report.setSnapshotDeleted(true); + + return report; + } + + // ==================== Utility Methods ==================== + public Map> drainDeletionBuckets() { + return snapshotDeletion.drainDeletionBuckets(); + } + + /** + * Clean empty directories using externally aggregated deletion buckets. + * + *

Used in parallel sink mode where buckets are aggregated from multiple workers. + * + * @param aggregatedBuckets merged deletion buckets from all workers + */ + public void cleanEmptyDirectories(Map> aggregatedBuckets) { + snapshotDeletion.cleanEmptyDirectories(aggregatedBuckets); + } + + /** Clean empty directories after data files have been deleted. */ + public void cleanEmptyDirectories() { + snapshotDeletion.cleanEmptyDirectories(); + } + + /** + * Commit a changelog when changelog is decoupled from snapshot. + * + * @param changelog the changelog to commit + */ + public void commitChangelog(Changelog changelog) { + try { + changelogManager.commitChangelog(changelog, changelog.id()); + changelogManager.commitLongLivedChangelogLatestHint(changelog.id()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Write the earliest snapshot hint file. + * + * @param earliest the earliest snapshot ID to write + */ + public void writeEarliestHint(long earliest) { + try { + snapshotManager.commitEarliestHint(earliest); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlan.java b/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlan.java new file mode 100644 index 000000000000..ab8b90cf8fac --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlan.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * The plan for snapshot expiration, containing four groups of tasks organized by deletion phase. + * + *

The plan organizes tasks into four groups for correct deletion order: + * + *

    + *
  • {@link #dataFileTasks()}: Phase 1a - Delete data files (can be parallelized) + *
  • {@link #changelogFileTasks()}: Phase 1b - Delete changelog files (can be parallelized) + *
  • {@link #manifestTasks()}: Phase 2a - Delete manifest files (serial) + *
  • {@link #snapshotFileTasks()}: Phase 2b - Delete snapshot metadata files (serial) + *
+ */ +public class ExpireSnapshotsPlan implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final ExpireSnapshotsPlan EMPTY = + new ExpireSnapshotsPlan( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + null, + 0, + 0); + + private final List dataFileTasks; + private final List changelogFileTasks; + private final List manifestTasks; + private final List snapshotFileTasks; + private final ProtectionSet protectionSet; + private final long beginInclusiveId; + private final long endExclusiveId; + + public ExpireSnapshotsPlan( + List dataFileTasks, + List changelogFileTasks, + List manifestTasks, + List snapshotFileTasks, + ProtectionSet protectionSet, + long beginInclusiveId, + long endExclusiveId) { + this.dataFileTasks = dataFileTasks; + this.changelogFileTasks = changelogFileTasks; + this.manifestTasks = manifestTasks; + this.snapshotFileTasks = snapshotFileTasks; + this.protectionSet = protectionSet; + this.beginInclusiveId = beginInclusiveId; + this.endExclusiveId = endExclusiveId; + } + + public static ExpireSnapshotsPlan empty() { + return EMPTY; + } + + public boolean isEmpty() { + return dataFileTasks.isEmpty() + && changelogFileTasks.isEmpty() + && manifestTasks.isEmpty() + && snapshotFileTasks.isEmpty(); + } + + /** Get data file deletion tasks (Phase 1a). These can be executed in parallel. */ + public List dataFileTasks() { + return dataFileTasks; + } + + /** Get changelog file deletion tasks (Phase 1b). These can be executed in parallel. */ + public List changelogFileTasks() { + return changelogFileTasks; + } + + /** Get manifest deletion tasks (Phase 2a). These should be executed serially. */ + public List manifestTasks() { + return manifestTasks; + } + + /** Get snapshot file deletion tasks (Phase 2b). These should be executed serially. */ + public List snapshotFileTasks() { + return snapshotFileTasks; + } + + /** + * Partition tasks into groups by snapshot ID range for parallel mode execution. + * + *

Each group contains tasks for a contiguous range of snapshots, with dataFileTasks first, + * then changelogFileTasks. This ensures: + * + *

    + *
  • Same snapshot range tasks are processed by the same worker + *
  • Within each worker, data files are deleted before changelog files + *
+ * + *

For example, with beginInclusiveId=1, endExclusiveId=11, parallelism=3: + * + *

    + *
  • Worker 0 (snapshot 1-4): [dataTask(2,3,4), changelogTask(1,2,3,4)] + *
  • Worker 1 (snapshot 5-8): [dataTask(5,6,7,8), changelogTask(5,6,7,8)] + *
  • Worker 2 (snapshot 9-11): [dataTask(9,10,11), changelogTask(9,10)] + *
+ * + * @param parallelism target parallelism for distribution + * @return list of task groups, one per worker + */ + public List> partitionTasksBySnapshotRange(int parallelism) { + if (dataFileTasks.isEmpty() && changelogFileTasks.isEmpty()) { + return Collections.emptyList(); + } + + // Build maps for quick lookup: snapshotId -> task + Map dataFileTaskMap = new HashMap<>(); + for (SnapshotExpireTask task : dataFileTasks) { + dataFileTaskMap.put(task.snapshotId(), task); + } + Map changelogFileTaskMap = new HashMap<>(); + for (SnapshotExpireTask task : changelogFileTasks) { + changelogFileTaskMap.put(task.snapshotId(), task); + } + + // Calculate snapshot ranges for each worker + // Total snapshot range is [beginInclusiveId, endExclusiveId] + long totalSnapshots = endExclusiveId - beginInclusiveId + 1; + int snapshotsPerWorker = (int) ((totalSnapshots + parallelism - 1) / parallelism); + + List> result = new ArrayList<>(parallelism); + for (int i = 0; i < parallelism; i++) { + long rangeStart = beginInclusiveId + (long) i * snapshotsPerWorker; + long rangeEnd = Math.min(rangeStart + snapshotsPerWorker - 1, endExclusiveId); + + if (rangeStart > endExclusiveId) { + break; + } + + List workerTasks = new ArrayList<>(); + + // First pass: add all dataFileTasks in this range + for (long id = rangeStart; id <= rangeEnd; id++) { + SnapshotExpireTask task = dataFileTaskMap.get(id); + if (task != null) { + workerTasks.add(task); + } + } + + // Second pass: add all changelogFileTasks in this range + for (long id = rangeStart; id <= rangeEnd; id++) { + SnapshotExpireTask task = changelogFileTaskMap.get(id); + if (task != null) { + workerTasks.add(task); + } + } + + if (!workerTasks.isEmpty()) { + result.add(workerTasks); + } + } + + return result; + } + + public ProtectionSet protectionSet() { + return protectionSet; + } + + public long beginInclusiveId() { + return beginInclusiveId; + } + + public long endExclusiveId() { + return endExclusiveId; + } + + @Override + public String toString() { + return "ExpireSnapshotsPlan{" + + "dataFileTasks=" + + dataFileTasks.size() + + ", changelogFileTasks=" + + changelogFileTasks.size() + + ", manifestTasks=" + + manifestTasks.size() + + ", snapshotFileTasks=" + + snapshotFileTasks.size() + + ", beginInclusiveId=" + + beginInclusiveId + + ", endExclusiveId=" + + endExclusiveId + + ", hasProtectionSet=" + + (protectionSet != null) + + '}'; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlanner.java b/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlanner.java new file mode 100644 index 000000000000..708d566e99fb --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlanner.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot; +import static org.apache.paimon.utils.SnapshotManager.findPreviousSnapshot; + +/** + * Planner for snapshot expiration. This class computes the expiration plan including: + * + *
    + *
  • The range of snapshots to expire [beginInclusiveId, endExclusiveId) + *
  • Protection set containing manifests that should not be deleted + *
  • Four groups of tasks organized by deletion phase + *
+ * + *

Tag data files are loaded on-demand by workers using {@link + * SnapshotDeletion#createDataFileSkipperForTags}, which has internal caching to avoid repeated + * reads. + */ +public class ExpireSnapshotsPlanner { + + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsPlanner.class); + + private final SnapshotManager snapshotManager; + private final ConsumerManager consumerManager; + private final SnapshotDeletion snapshotDeletion; + private final TagManager tagManager; + + public ExpireSnapshotsPlanner( + SnapshotManager snapshotManager, + ConsumerManager consumerManager, + SnapshotDeletion snapshotDeletion, + TagManager tagManager) { + this.snapshotManager = snapshotManager; + this.consumerManager = consumerManager; + this.snapshotDeletion = snapshotDeletion; + this.tagManager = tagManager; + } + + /** Creates an ExpireSnapshotsPlanner from a FileStoreTable. */ + public static ExpireSnapshotsPlanner create(FileStoreTable table) { + SnapshotManager snapshotManager = table.snapshotManager(); + ConsumerManager consumerManager = + new ConsumerManager(table.fileIO(), table.location(), snapshotManager.branch()); + return new ExpireSnapshotsPlanner( + table.snapshotManager(), + consumerManager, + table.store().newSnapshotDeletion(), + table.tagManager()); + } + + /** + * Plan the snapshot expiration. + * + * @param config expiration configuration + * @return the expiration plan with three groups of tasks, or empty plan if nothing to expire + */ + public ExpireSnapshotsPlan plan(ExpireConfig config) { + snapshotDeletion.setChangelogDecoupled(config.isChangelogDecoupled()); + int retainMax = config.getSnapshotRetainMax(); + int retainMin = config.getSnapshotRetainMin(); + int maxDeletes = config.getSnapshotMaxDeletes(); + long olderThanMills = + System.currentTimeMillis() - config.getSnapshotTimeRetain().toMillis(); + + // 1. Get snapshot range + Long latestSnapshotId = snapshotManager.latestSnapshotId(); + if (latestSnapshotId == null) { + // no snapshot, nothing to expire + return ExpireSnapshotsPlan.empty(); + } + + Long earliestId = snapshotManager.earliestSnapshotId(); + if (earliestId == null) { + return ExpireSnapshotsPlan.empty(); + } + + Preconditions.checkArgument( + retainMax >= retainMin, + String.format( + "retainMax (%s) must not be less than retainMin (%s).", + retainMax, retainMin)); + + // the min snapshot to retain from 'snapshot.num-retained.max' + // (the maximum number of snapshots to retain) + long min = Math.max(latestSnapshotId - retainMax + 1, earliestId); + + // the max exclusive snapshot to expire until + // protected by 'snapshot.num-retained.min' + // (the minimum number of completed snapshots to retain) + long endExclusiveId = latestSnapshotId - retainMin + 1; + + // the snapshot being read by the consumer cannot be deleted + long consumerProtection = consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE); + endExclusiveId = Math.min(endExclusiveId, consumerProtection); + + // protected by 'snapshot.expire.limit' + // (the maximum number of snapshots allowed to expire at a time) + endExclusiveId = Math.min(endExclusiveId, earliestId + maxDeletes); + + for (long id = min; id < endExclusiveId; id++) { + // Early exit the loop for 'snapshot.time-retained' + // (the maximum time of snapshots to retain) + if (snapshotManager.snapshotExists(id) + && olderThanMills <= snapshotManager.snapshot(id).timeMillis()) { + endExclusiveId = id; + break; + } + } + + return plan(earliestId, endExclusiveId, config.isChangelogDecoupled()); + } + + @VisibleForTesting + public ExpireSnapshotsPlan plan( + long earliestId, long endExclusiveId, boolean changelogDecoupled) { + // Boundary check + if (endExclusiveId <= earliestId) { + // No expire happens: + // write the hint file in order to see the earliest snapshot directly next time + // should avoid duplicate writes when the file exists + if (snapshotManager.earliestFileNotExists()) { + try { + snapshotManager.commitEarliestHint(earliestId); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return ExpireSnapshotsPlan.empty(); + } + + // find first snapshot to expire + long beginInclusiveId = earliestId; + for (long id = endExclusiveId - 1; id >= earliestId; id--) { + if (!snapshotManager.snapshotExists(id)) { + // only latest snapshots are retained, as we cannot find this snapshot, we can + // assume that all snapshots preceding it have been removed + beginInclusiveId = id + 1; + break; + } + } + + // Pre-read and cache endSnapshot + Snapshot cachedEndSnapshot; + try { + cachedEndSnapshot = snapshotManager.tryGetSnapshot(endExclusiveId); + } catch (FileNotFoundException e) { + // the end exclusive snapshot is gone + // there is no need to proceed + LOG.warn("End snapshot {} not found, abort expiration", endExclusiveId); + return ExpireSnapshotsPlan.empty(); + } + + // Build protection set + ProtectionSet protectionSet = + buildProtectionSet(beginInclusiveId, endExclusiveId, cachedEndSnapshot); + + // Generate four groups of tasks + List dataFileTasks = new ArrayList<>(); + List changelogFileTasks = new ArrayList<>(); + List manifestTasks = new ArrayList<>(); + List snapshotFileTasks = new ArrayList<>(); + + // Data file tasks: range is (beginInclusiveId, endExclusiveId] + // deleted merge tree files in a snapshot are not used by the next snapshot, so the range of + // id should be (beginInclusiveId, endExclusiveId] + for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) { + dataFileTasks.add(SnapshotExpireTask.forDataFiles(id)); + } + + // Changelog file tasks: range is [beginInclusiveId, endExclusiveId) + // Only when changelog is not decoupled + if (!changelogDecoupled) { + for (long id = beginInclusiveId; id < endExclusiveId; id++) { + changelogFileTasks.add(SnapshotExpireTask.forChangelogFiles(id)); + } + } + + // Manifest and snapshot file tasks: range is [beginInclusiveId, endExclusiveId) + for (long id = beginInclusiveId; id < endExclusiveId; id++) { + // Skip cleaning manifest files due to failed to build skipping set + if (protectionSet.manifestSkippingSet() != null) { + manifestTasks.add(SnapshotExpireTask.forManifests(id)); + } + snapshotFileTasks.add(SnapshotExpireTask.forSnapshot(id, changelogDecoupled)); + } + + LOG.info( + "Planned expiration: range=[{}, {}), dataFileTasks={}, changelogFileTasks={}, manifestTasks={}, snapshotFileTasks={}", + beginInclusiveId, + endExclusiveId, + dataFileTasks.size(), + changelogFileTasks.size(), + manifestTasks.size(), + snapshotFileTasks.size()); + + return new ExpireSnapshotsPlan( + dataFileTasks, + changelogFileTasks, + manifestTasks, + snapshotFileTasks, + protectionSet, + beginInclusiveId, + endExclusiveId); + } + + private ProtectionSet buildProtectionSet( + long beginInclusiveId, long endExclusiveId, Snapshot cachedEndSnapshot) { + + List taggedSnapshots = tagManager.taggedSnapshots(); + + // 1. Find skipping tags that overlap with expire range + List skippingTags = + findSkippingTags(taggedSnapshots, beginInclusiveId, endExclusiveId); + + // 2. Build manifest skipping set + List manifestProtectedSnapshots = new ArrayList<>(skippingTags); + manifestProtectedSnapshots.add(cachedEndSnapshot); + Set manifestSkippingSet = null; + try { + manifestSkippingSet = + new HashSet<>(snapshotDeletion.manifestSkippingSet(manifestProtectedSnapshots)); + } catch (Exception e) { + LOG.warn("Skip cleaning manifest files due to failed to build skipping set.", e); + } + + return new ProtectionSet(taggedSnapshots, manifestSkippingSet); + } + + /** Find the skipping tags in sortedTags for range of [beginInclusive, endExclusive). */ + public static List findSkippingTags( + List sortedTags, long beginInclusive, long endExclusive) { + List overlappedSnapshots = new ArrayList<>(); + int right = findPreviousSnapshot(sortedTags, endExclusive); + if (right >= 0) { + int left = Math.max(findPreviousOrEqualSnapshot(sortedTags, beginInclusive), 0); + for (int i = left; i <= right; i++) { + overlappedSnapshots.add(sortedTags.get(i)); + } + } + return overlappedSnapshots; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/expire/ProtectionSet.java b/paimon-core/src/main/java/org/apache/paimon/operation/expire/ProtectionSet.java new file mode 100644 index 000000000000..3539ca83287c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/expire/ProtectionSet.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import org.apache.paimon.Snapshot; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Immutable protection set for parallel expire snapshots. This class holds all the information + * needed to protect files that should not be deleted during expiration. + * + *

This class is broadcast to all workers and must be treated as read-only. + * + *

Note: Tag data files are loaded on-demand by workers using {@link + * org.apache.paimon.operation.SnapshotDeletion#createDataFileSkipperForTags}, which has internal + * caching to avoid repeated reads. + */ +public class ProtectionSet implements Serializable { + + private static final long serialVersionUID = 1L; + + /** All tagged snapshots, sorted by snapshot ID. */ + private final List taggedSnapshots; + + /** Manifest file names that should be protected (from Tags and endSnapshot). */ + private final Set manifestSkippingSet; + + public ProtectionSet(List taggedSnapshots, Set manifestSkippingSet) { + this.taggedSnapshots = Collections.unmodifiableList(taggedSnapshots); + this.manifestSkippingSet = + manifestSkippingSet == null + ? null + : Collections.unmodifiableSet(manifestSkippingSet); + } + + public List taggedSnapshots() { + return taggedSnapshots; + } + + public Set manifestSkippingSet() { + return manifestSkippingSet; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/expire/SnapshotExpireTask.java b/paimon-core/src/main/java/org/apache/paimon/operation/expire/SnapshotExpireTask.java new file mode 100644 index 000000000000..bf20231ac2f3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/expire/SnapshotExpireTask.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import java.io.Serializable; + +/** + * A task representing a single snapshot expiration operation. + * + *

Each task has a specific {@link TaskType} that determines what to delete: + * + *

    + *
  • {@link TaskType#DELETE_DATA_FILES}: Delete data files + *
  • {@link TaskType#DELETE_CHANGELOG_FILES}: Delete changelog files + *
  • {@link TaskType#DELETE_MANIFESTS}: Delete manifest files + *
  • {@link TaskType#DELETE_SNAPSHOT}: Delete snapshot metadata file + *
+ * + *

This design ensures correct deletion order in the execution layer: + * + *

    + *
  • Phase 1: All data file tasks are executed first + *
  • Phase 2a: All manifest tasks are executed + *
  • Phase 2b: All snapshot tasks are executed last + *
+ */ +public class SnapshotExpireTask implements Serializable { + + private static final long serialVersionUID = 1L; + + /** The type of deletion operation this task performs. */ + public enum TaskType { + /** Delete data files only. Used for both expiring snapshots and boundary snapshot. */ + DELETE_DATA_FILES, + + /** Delete changelog files. Separate phase for changelog deletion with different range. */ + DELETE_CHANGELOG_FILES, + + /** Delete manifest files. Second phase for all expiring snapshots. */ + DELETE_MANIFESTS, + + /** Delete snapshot metadata file. Final phase for all expiring snapshots. */ + DELETE_SNAPSHOT + } + + private final long snapshotId; + private final TaskType taskType; + private final boolean changelogDecoupled; + + public SnapshotExpireTask(long snapshotId, TaskType taskType, boolean changelogDecoupled) { + this.snapshotId = snapshotId; + this.taskType = taskType; + this.changelogDecoupled = changelogDecoupled; + } + + /** + * Create a task for deleting data files. + * + * @param snapshotId the snapshot ID + * @return a data file deletion task + */ + public static SnapshotExpireTask forDataFiles(long snapshotId) { + return new SnapshotExpireTask(snapshotId, TaskType.DELETE_DATA_FILES, false); + } + + /** + * Create a task for deleting changelog files. + * + * @param snapshotId the snapshot ID + * @return a changelog file deletion task + */ + public static SnapshotExpireTask forChangelogFiles(long snapshotId) { + return new SnapshotExpireTask(snapshotId, TaskType.DELETE_CHANGELOG_FILES, false); + } + + /** + * Create a task for deleting manifests. + * + * @param snapshotId the snapshot ID + * @return a manifest deletion task + */ + public static SnapshotExpireTask forManifests(long snapshotId) { + return new SnapshotExpireTask(snapshotId, TaskType.DELETE_MANIFESTS, false); + } + + /** + * Create a task for deleting snapshot metadata file. + * + * @param snapshotId the snapshot ID + * @param changelogDecoupled whether changelog is decoupled from snapshot + * @return a snapshot deletion task + */ + public static SnapshotExpireTask forSnapshot(long snapshotId, boolean changelogDecoupled) { + return new SnapshotExpireTask(snapshotId, TaskType.DELETE_SNAPSHOT, changelogDecoupled); + } + + public long snapshotId() { + return snapshotId; + } + + public TaskType taskType() { + return taskType; + } + + public boolean isChangelogDecoupled() { + return changelogDecoupled; + } + + @Override + public String toString() { + return "SnapshotExpireTask{" + + "snapshotId=" + + snapshotId + + ", taskType=" + + taskType + + ", changelogDecoupled=" + + changelogDecoupled + + '}'; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 1b42f2339bad..97825af3e9bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -39,7 +39,7 @@ import java.util.Set; import java.util.function.Predicate; -import static org.apache.paimon.table.ExpireSnapshotsImpl.findSkippingTags; +import static org.apache.paimon.operation.expire.ExpireSnapshotsPlanner.findSkippingTags; /** Cleanup the changelog in changelog directory. */ public class ExpireChangelogImpl implements ExpireSnapshots { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 01c7e74966ce..6f9a309a06cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -18,43 +18,43 @@ package org.apache.paimon.table; -import org.apache.paimon.Changelog; -import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; -import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.operation.expire.ExpireSnapshotsExecutor; +import org.apache.paimon.operation.expire.ExpireSnapshotsPlan; +import org.apache.paimon.operation.expire.ExpireSnapshotsPlanner; +import org.apache.paimon.operation.expire.SnapshotExpireTask; import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.utils.ChangelogManager; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; -import java.util.function.Predicate; -import static org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot; -import static org.apache.paimon.utils.SnapshotManager.findPreviousSnapshot; - -/** An implementation for {@link ExpireSnapshots}. */ +/** + * An implementation for {@link ExpireSnapshots}. + * + *

This implementation uses serial execution mode, which is suitable for local execution. The + * execution follows four phases: + * + *

    + *
  • Phase 1a: Delete all data files (using dataFileTasks) + *
  • Phase 1b: Delete all changelog files (using changelogFileTasks) + *
  • Phase 2a: Delete all manifests (using manifestTasks) + *
  • Phase 2b: Delete all snapshot files (using snapshotFileTasks) + *
+ */ public class ExpireSnapshotsImpl implements ExpireSnapshots { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsImpl.class); - private final SnapshotManager snapshotManager; - private final ChangelogManager changelogManager; - private final ConsumerManager consumerManager; - private final SnapshotDeletion snapshotDeletion; - private final TagManager tagManager; + private final ExpireSnapshotsPlanner planner; + private final ExpireSnapshotsExecutor executor; private ExpireConfig expireConfig; @@ -63,16 +63,18 @@ public ExpireSnapshotsImpl( ChangelogManager changelogManager, SnapshotDeletion snapshotDeletion, TagManager tagManager) { - this.snapshotManager = snapshotManager; - this.changelogManager = changelogManager; - this.consumerManager = + this.expireConfig = ExpireConfig.builder().build(); + + ConsumerManager consumerManager = new ConsumerManager( snapshotManager.fileIO(), snapshotManager.tablePath(), snapshotManager.branch()); - this.snapshotDeletion = snapshotDeletion; - this.tagManager = tagManager; - this.expireConfig = ExpireConfig.builder().build(); + this.planner = + new ExpireSnapshotsPlanner( + snapshotManager, consumerManager, snapshotDeletion, tagManager); + this.executor = + new ExpireSnapshotsExecutor(snapshotManager, snapshotDeletion, changelogManager); } @Override @@ -83,234 +85,54 @@ public ExpireSnapshots config(ExpireConfig expireConfig) { @Override public int expire() { - snapshotDeletion.setChangelogDecoupled(expireConfig.isChangelogDecoupled()); - int retainMax = expireConfig.getSnapshotRetainMax(); - int retainMin = expireConfig.getSnapshotRetainMin(); - int maxDeletes = expireConfig.getSnapshotMaxDeletes(); - long olderThanMills = - System.currentTimeMillis() - expireConfig.getSnapshotTimeRetain().toMillis(); - - Long latestSnapshotId = snapshotManager.latestSnapshotId(); - if (latestSnapshotId == null) { - // no snapshot, nothing to expire - return 0; - } - - Long earliest = snapshotManager.earliestSnapshotId(); - if (earliest == null) { + ExpireSnapshotsPlan plan = planner.plan(expireConfig); + if (plan.isEmpty()) { return 0; } - Preconditions.checkArgument( - retainMax >= retainMin, - String.format( - "retainMax (%s) must not be less than retainMin (%s).", - retainMax, retainMin)); - - // the min snapshot to retain from 'snapshot.num-retained.max' - // (the maximum number of snapshots to retain) - long min = Math.max(latestSnapshotId - retainMax + 1, earliest); - - // the max exclusive snapshot to expire until - // protected by 'snapshot.num-retained.min' - // (the minimum number of completed snapshots to retain) - long maxExclusive = latestSnapshotId - retainMin + 1; - - // the snapshot being read by the consumer cannot be deleted - maxExclusive = - Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE)); - - // protected by 'snapshot.expire.limit' - // (the maximum number of snapshots allowed to expire at a time) - maxExclusive = Math.min(maxExclusive, earliest + maxDeletes); - - for (long id = min; id < maxExclusive; id++) { - // Early exit the loop for 'snapshot.time-retained' - // (the maximum time of snapshots to retain) - if (snapshotManager.snapshotExists(id) - && olderThanMills <= snapshotManager.snapshot(id).timeMillis()) { - return expireUntil(earliest, id); - } - } - - return expireUntil(earliest, maxExclusive); + return executeWithPlan(plan); } @VisibleForTesting - public int expireUntil(long earliestId, long endExclusiveId) { + public int executeWithPlan(ExpireSnapshotsPlan plan) { long startTime = System.currentTimeMillis(); - if (endExclusiveId <= earliestId) { - // No expire happens: - // write the hint file in order to see the earliest snapshot directly next time - // should avoid duplicate writes when the file exists - if (snapshotManager.earliestFileNotExists()) { - writeEarliestHint(earliestId); - } - - // fast exit - return 0; + // Phase 1a: Delete all data files + for (SnapshotExpireTask task : plan.dataFileTasks()) { + executor.execute(task, plan.protectionSet().taggedSnapshots(), null); } - // find first snapshot to expire - long beginInclusiveId = earliestId; - for (long id = endExclusiveId - 1; id >= earliestId; id--) { - if (!snapshotManager.snapshotExists(id)) { - // only latest snapshots are retained, as we cannot find this snapshot, we can - // assume that all snapshots preceding it have been removed - beginInclusiveId = id + 1; - break; - } + // Phase 1b: Delete all changelog files + for (SnapshotExpireTask task : plan.changelogFileTasks()) { + executor.execute(task, null, null); } - List taggedSnapshots = tagManager.taggedSnapshots(); - - // delete merge tree files - // deleted merge tree files in a snapshot are not used by the next snapshot, so the range of - // id should be (beginInclusiveId, endExclusiveId] - for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ready to delete merge tree files not used by snapshot #" + id); - } - Snapshot snapshot; - try { - snapshot = snapshotManager.tryGetSnapshot(id); - } catch (FileNotFoundException e) { - beginInclusiveId = id + 1; - continue; - } - // expire merge tree files and collect changed buckets - Predicate skipper; - try { - skipper = snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id); - } catch (Exception e) { - LOG.info( - String.format( - "Skip cleaning data files of snapshot '%s' due to failed to build skipping set.", - id), - e); - continue; - } - - snapshotDeletion.cleanUnusedDataFiles(snapshot, skipper); - } + // Clean empty directories after all data files and changelog files are deleted + executor.cleanEmptyDirectories(); - // delete changelog files - if (!expireConfig.isChangelogDecoupled()) { - for (long id = beginInclusiveId; id < endExclusiveId; id++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ready to delete changelog files from snapshot #" + id); - } - Snapshot snapshot; - try { - snapshot = snapshotManager.tryGetSnapshot(id); - } catch (FileNotFoundException e) { - beginInclusiveId = id + 1; - continue; - } - if (snapshot.changelogManifestList() != null) { - snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList()); - } + // Phase 2a: Delete all manifests + if (plan.protectionSet().manifestSkippingSet() != null) { + Set skippingSet = new HashSet<>(plan.protectionSet().manifestSkippingSet()); + for (SnapshotExpireTask task : plan.manifestTasks()) { + executor.execute(task, null, skippingSet); } } - // data files and changelog files in bucket directories has been deleted - // then delete changed bucket directories if they are empty - snapshotDeletion.cleanEmptyDirectories(); - - // delete manifests and indexFiles - List skippingSnapshots = - findSkippingTags(taggedSnapshots, beginInclusiveId, endExclusiveId); - - try { - skippingSnapshots.add(snapshotManager.tryGetSnapshot(endExclusiveId)); - } catch (FileNotFoundException e) { - // the end exclusive snapshot is gone - // there is no need to proceed - return 0; + // Phase 2b: Delete all snapshot files + for (SnapshotExpireTask task : plan.snapshotFileTasks()) { + executor.execute(task, null, null); } - Set skippingSet = null; - try { - skippingSet = new HashSet<>(snapshotDeletion.manifestSkippingSet(skippingSnapshots)); - } catch (Exception e) { - LOG.info("Skip cleaning manifest files due to failed to build skipping set.", e); - } - if (skippingSet != null) { - for (long id = beginInclusiveId; id < endExclusiveId; id++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ready to delete manifests in snapshot #" + id); - } - - Snapshot snapshot; - try { - snapshot = snapshotManager.tryGetSnapshot(id); - } catch (FileNotFoundException e) { - beginInclusiveId = id + 1; - continue; - } - snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet); - } - } - - // delete snapshot file finally - for (long id = beginInclusiveId; id < endExclusiveId; id++) { - Snapshot snapshot; - try { - snapshot = snapshotManager.tryGetSnapshot(id); - } catch (FileNotFoundException e) { - beginInclusiveId = id + 1; - continue; - } - if (expireConfig.isChangelogDecoupled()) { - commitChangelog(new Changelog(snapshot)); - } - snapshotManager.deleteSnapshot(id); - } + // Write earliest hint + executor.writeEarliestHint(plan.endExclusiveId()); - writeEarliestHint(endExclusiveId); long duration = System.currentTimeMillis() - startTime; LOG.info( "Finished expire snapshots, duration {} ms, range is [{}, {})", duration, - beginInclusiveId, - endExclusiveId); - return (int) (endExclusiveId - beginInclusiveId); - } + plan.beginInclusiveId(), + plan.endExclusiveId()); - private void commitChangelog(Changelog changelog) { - try { - changelogManager.commitChangelog(changelog, changelog.id()); - changelogManager.commitLongLivedChangelogLatestHint(changelog.id()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private void writeEarliestHint(long earliest) { - try { - snapshotManager.commitEarliestHint(earliest); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @VisibleForTesting - public SnapshotDeletion snapshotDeletion() { - return snapshotDeletion; - } - - /** Find the skipping tags in sortedTags for range of [beginInclusive, endExclusive). */ - public static List findSkippingTags( - List sortedTags, long beginInclusive, long endExclusive) { - List overlappedSnapshots = new ArrayList<>(); - int right = findPreviousSnapshot(sortedTags, endExclusive); - if (right >= 0) { - int left = Math.max(findPreviousOrEqualSnapshot(sortedTags, beginInclusive), 0); - for (int i = left; i <= right; i++) { - overlappedSnapshots.add(sortedTags.get(i)); - } - } - return overlappedSnapshots; + return (int) (plan.endExclusiveId() - plan.beginInclusiveId()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index d63af5fd2c3e..e7bc09a9709a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -62,7 +62,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -77,8 +76,8 @@ public class ExpireSnapshotsTest { protected final FileIO fileIO = new LocalFileIO(); protected TestKeyValueGenerator gen; - @TempDir java.nio.file.Path tempDir; - @TempDir java.nio.file.Path tempExternalPath; + @TempDir protected java.nio.file.Path tempDir; + @TempDir protected java.nio.file.Path tempExternalPath; protected TestFileStore store; protected SnapshotManager snapshotManager; protected ChangelogManager changelogManager; @@ -102,7 +101,12 @@ public void beforeEach() throws Exception { @Test public void testExpireWithMissingFiles() throws Exception { - ExpireSnapshots expire = store.newExpire(1, 1, 1); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(1) + .snapshotTimeRetain(Duration.ofMillis(1)) + .build(); List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); @@ -132,7 +136,7 @@ public void testExpireWithMissingFiles() throws Exception { fileIO.deleteQuietly(unusedFileList.get(i)); } - expire.expire(); + doExpire(config); for (int i = 1; i < latestSnapshotId; i++) { assertThat(snapshotManager.snapshotExists(i)).isFalse(); @@ -150,7 +154,12 @@ public void testExpireWithMissingFilesWithExternalPath() throws Exception { .set( CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, ExternalPathStrategy.ROUND_ROBIN); - ExpireSnapshots expire = store.newExpire(1, 1, 1); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(1) + .snapshotTimeRetain(Duration.ofMillis(1)) + .build(); List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); @@ -193,7 +202,7 @@ public void testExpireWithMissingFilesWithExternalPath() throws Exception { fileIO.deleteQuietly(unusedFileList.get(i)); } - expire.expire(); + doExpire(config); for (int i = 1; i < latestSnapshotId; i++) { assertThat(snapshotManager.snapshotExists(i)).isFalse(); @@ -226,7 +235,13 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { // randomly expire snapshots int expired = random.nextInt(latestSnapshotId / 2) + 1; int retained = latestSnapshotId - expired; - store.newExpire(retained, retained, Long.MAX_VALUE).expire(); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(retained) + .snapshotRetainMax(retained) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .build(); + doExpire(config); // randomly delete tags for (int id = 1; id <= latestSnapshotId; id++) { @@ -251,8 +266,6 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { @Test public void testExpireExtraFiles() throws IOException { - ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(1, 3, Long.MAX_VALUE); - // write test files BinaryRow partition = gen.getPartition(gen.next()); Path bucketPath = store.pathFactory().bucketPath(partition, 0); @@ -290,8 +303,8 @@ public void testExpireExtraFiles() throws IOException { ManifestEntry add = ManifestEntry.create(FileKind.ADD, partition, 0, 1, dataFile); ManifestEntry delete = ManifestEntry.create(FileKind.DELETE, partition, 0, 1, dataFile); - // expire - expire.snapshotDeletion() + // expire using SnapshotDeletion directly + store.newSnapshotDeletion() .cleanUnusedDataFile( Arrays.asList(ExpireFileEntry.from(add), ExpireFileEntry.from(delete))); @@ -312,7 +325,7 @@ public void testExpireExtraFilesWithExternalPath() throws IOException { .set( CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, ExternalPathStrategy.ROUND_ROBIN); - ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(1, 3, Long.MAX_VALUE); + // write test files BinaryRow partition = gen.getPartition(gen.next()); @@ -353,8 +366,8 @@ public void testExpireExtraFilesWithExternalPath() throws IOException { ManifestEntry add = ManifestEntry.create(FileKind.ADD, partition, 0, 1, dataFile); ManifestEntry delete = ManifestEntry.create(FileKind.DELETE, partition, 0, 1, dataFile); - // expire - expire.snapshotDeletion() + // expire using SnapshotDeletion directly + store.newSnapshotDeletion() .cleanUnusedDataFile( Arrays.asList(ExpireFileEntry.from(add), ExpireFileEntry.from(delete))); @@ -367,9 +380,14 @@ public void testExpireExtraFilesWithExternalPath() throws IOException { } @Test - public void testNoSnapshot() throws IOException { - ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE); - expire.expire(); + public void testNoSnapshot() throws Exception { + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(3) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .build(); + doExpire(config); assertThat(snapshotManager.latestSnapshotId()).isNull(); @@ -382,8 +400,13 @@ public void testNotEnoughSnapshots() throws Exception { List snapshotPositions = new ArrayList<>(); commit(2, allData, snapshotPositions); int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); - ExpireSnapshots expire = store.newExpire(1, latestSnapshotId + 1, Long.MAX_VALUE); - expire.expire(); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(latestSnapshotId + 1) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .build(); + doExpire(config); for (int i = 1; i <= latestSnapshotId; i++) { assertThat(snapshotManager.snapshotExists(i)).isTrue(); @@ -399,8 +422,13 @@ public void testNeverExpire() throws Exception { List snapshotPositions = new ArrayList<>(); commit(5, allData, snapshotPositions); int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); - ExpireSnapshots expire = store.newExpire(1, Integer.MAX_VALUE, Long.MAX_VALUE); - expire.expire(); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(Integer.MAX_VALUE) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .build(); + doExpire(config); for (int i = 1; i <= latestSnapshotId; i++) { assertThat(snapshotManager.snapshotExists(i)).isTrue(); @@ -420,8 +448,13 @@ public void testNumRetainedMin() throws Exception { commit(numRetainedMin + random.nextInt(5), allData, snapshotPositions); int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); Thread.sleep(100); - ExpireSnapshots expire = store.newExpire(numRetainedMin, Integer.MAX_VALUE, 1); - expire.expire(); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(numRetainedMin) + .snapshotRetainMax(Integer.MAX_VALUE) + .snapshotTimeRetain(Duration.ofMillis(1)) + .build(); + doExpire(config); for (int i = 1; i <= latestSnapshotId - numRetainedMin; i++) { assertThat(snapshotManager.snapshotExists(i)).isFalse(); @@ -436,31 +469,21 @@ public void testNumRetainedMin() throws Exception { @Test public void testExpireEmptySnapshot() throws Exception { - Random random = new Random(); - List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); commit(100, allData, snapshotPositions); - int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); List s = new ArrayList<>(); - s.add( - new Thread( - () -> { - final ExpireSnapshotsImpl expire = - (ExpireSnapshotsImpl) store.newExpire(1, Integer.MAX_VALUE, 1); - expire.expireUntil(89, latestSnapshotId); - })); + // Create multiple threads that concurrently expire snapshots for (int i = 0; i < 10; i++) { - final ExpireSnapshotsImpl expire = - (ExpireSnapshotsImpl) store.newExpire(1, Integer.MAX_VALUE, 1); - s.add( - new Thread( - () -> { - int start = random.nextInt(latestSnapshotId - 10); - int end = start + random.nextInt(10); - expire.expireUntil(start, end); - })); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(Integer.MAX_VALUE) + .snapshotTimeRetain(Duration.ofMillis(1)) + .build(); + + s.add(new Thread(() -> doExpire(config))); } Assertions.assertThatCode( @@ -480,13 +503,18 @@ public void testExpireEmptySnapshot() throws Exception { @Test public void testExpireWithNumber() throws Exception { - ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(3) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .build(); List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); for (int i = 1; i <= 3; i++) { commit(ThreadLocalRandom.current().nextInt(5) + 1, allData, snapshotPositions); - expire.expire(); + doExpire(config); int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); for (int j = 1; j <= latestSnapshotId; j++) { @@ -518,11 +546,12 @@ public void testExpireWithNumber() throws Exception { @Test public void testExpireWithTime() throws Exception { - ExpireConfig.Builder builder = ExpireConfig.builder(); - builder.snapshotRetainMin(1) - .snapshotRetainMax(Integer.MAX_VALUE) - .snapshotTimeRetain(Duration.ofMillis(1000)); - ExpireSnapshots expire = store.newExpire(builder.build()); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(Integer.MAX_VALUE) + .snapshotTimeRetain(Duration.ofMillis(1000)) + .build(); List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); @@ -532,8 +561,8 @@ public void testExpireWithTime() throws Exception { long expireMillis = System.currentTimeMillis(); // expire twice to check for idempotence - expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire(); - expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire(); + doExpire(config); + doExpire(config); int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); for (int i = 1; i <= latestSnapshotId; i++) { @@ -583,8 +612,13 @@ public void testExpireWithUpgradedFile() throws Exception { FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2); // the data file still exists after expire - ExpireSnapshots expire = store.newExpire(1, 1, Long.MAX_VALUE); - expire.expire(); + ExpireConfig config = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(1) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .build(); + doExpire(config); FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2); store.assertCleaned(); @@ -603,11 +637,11 @@ public void testChangelogOutLivedSnapshot() throws Exception { .changelogRetainMax(3) .build(); - ExpireSnapshots snapshot = store.newExpire(config); + doExpire(config); ExpireSnapshots changelog = store.newChangelogExpire(config); // expire twice to check for idempotence - snapshot.expire(); - snapshot.expire(); + doExpire(config); + doExpire(config); int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); int earliestSnapshotId = snapshotManager.earliestSnapshotId().intValue(); @@ -655,7 +689,7 @@ public void testManifestFileSkippingSetFileNotFoundException() throws Exception .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) .build(); - store.newExpire(config).expire(); + doExpire(config); int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); assertSnapshot(latestSnapshotId, allData, snapshotPositions); @@ -720,4 +754,11 @@ protected void assertSnapshot( Map actual = store.toKvMap(actualKvs); assertThat(actual).isEqualTo(expected); } + + // ==================== Expire Method (can be overridden by subclass) ==================== + + /** Subclass can override this method to test different expire implementations. */ + protected int doExpire(ExpireConfig config) { + return store.newExpire(config).expire(); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/expire/DeletionReportTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/expire/DeletionReportTest.java new file mode 100644 index 000000000000..494680cb71b4 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/expire/DeletionReportTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.utils.InstantiationUtil; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DeletionReport}. */ +public class DeletionReportTest { + + @Test + public void testSerialization() throws IOException, ClassNotFoundException { + DeletionReport report = new DeletionReport(1L); + report.setDataFilesDeleted(true); + + DeletionReport deserialized = InstantiationUtil.clone(report); + assertThat(deserialized.snapshotId()).isEqualTo(1L); + assertThat(deserialized.deletionBuckets()).isEmpty(); + } + + @Test + public void testSerializationWithBuckets() throws IOException, ClassNotFoundException { + DeletionReport report = new DeletionReport(2L); + Map> buckets = new HashMap<>(); + + BinaryRow partition1 = createBinaryRow(10); + buckets.put(partition1, Collections.singleton(1)); + + report.setDeletionBuckets(buckets); + + // BinaryRow supports standard Java serialization via BinarySection's + // writeObject/readObject. + // This test confirms that DeletionReport with BinaryRow keys can be serialized. + DeletionReport deserialized = InstantiationUtil.clone(report); + assertThat(deserialized.snapshotId()).isEqualTo(2L); + assertThat(deserialized.deletionBuckets()).hasSize(1); + Map.Entry> entry = + deserialized.deletionBuckets().entrySet().iterator().next(); + assertThat(entry.getKey()).isEqualTo(partition1); + assertThat(entry.getValue()).containsExactly(1); + } + + private BinaryRow createBinaryRow(int i) { + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeInt(0, i); + writer.complete(); + return row; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlanTest.java new file mode 100644 index 000000000000..8976b7a1f477 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlanTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.expire; + +import org.apache.paimon.operation.expire.SnapshotExpireTask.TaskType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.paimon.operation.expire.SnapshotExpireTask.TaskType.DELETE_CHANGELOG_FILES; +import static org.apache.paimon.operation.expire.SnapshotExpireTask.TaskType.DELETE_DATA_FILES; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ExpireSnapshotsPlan}. */ +public class ExpireSnapshotsPlanTest { + + @Test + public void testPartitionTasksBySnapshotRange() { + // beginInclusiveId=1, endExclusiveId=11, parallelism=3 + List dataFileTasks = new ArrayList<>(); + for (long id = 2; id <= 11; id++) { + dataFileTasks.add(SnapshotExpireTask.forDataFiles(id)); + } + List changelogFileTasks = new ArrayList<>(); + for (long id = 1; id <= 10; id++) { + changelogFileTasks.add(SnapshotExpireTask.forChangelogFiles(id)); + } + + ExpireSnapshotsPlan plan = + new ExpireSnapshotsPlan( + dataFileTasks, + changelogFileTasks, + Collections.emptyList(), + Collections.emptyList(), + null, + 1, + 11); + + List> result = plan.partitionTasksBySnapshotRange(3); + assertThat(result).hasSize(3); + + // Worker 0 (snapshot 1-4): [dataTask(2,3,4), changelogTask(1,2,3,4)] + assertWorker( + result.get(0), + new long[] {2, 3, 4, 1, 2, 3, 4}, + new TaskType[] { + DELETE_DATA_FILES, + DELETE_DATA_FILES, + DELETE_DATA_FILES, + DELETE_CHANGELOG_FILES, + DELETE_CHANGELOG_FILES, + DELETE_CHANGELOG_FILES, + DELETE_CHANGELOG_FILES + }); + + // Worker 1 (snapshot 5-8): [dataTask(5,6,7,8), changelogTask(5,6,7,8)] + assertWorker( + result.get(1), + new long[] {5, 6, 7, 8, 5, 6, 7, 8}, + new TaskType[] { + DELETE_DATA_FILES, DELETE_DATA_FILES, DELETE_DATA_FILES, DELETE_DATA_FILES, + DELETE_CHANGELOG_FILES, DELETE_CHANGELOG_FILES, DELETE_CHANGELOG_FILES, + DELETE_CHANGELOG_FILES + }); + + // Worker 2 (snapshot 9-11): [dataTask(9,10,11), changelogTask(9,10)] + assertWorker( + result.get(2), + new long[] {9, 10, 11, 9, 10}, + new TaskType[] { + DELETE_DATA_FILES, + DELETE_DATA_FILES, + DELETE_DATA_FILES, + DELETE_CHANGELOG_FILES, + DELETE_CHANGELOG_FILES + }); + } + + private void assertWorker(List tasks, long[] ids, TaskType[] types) { + assertThat(tasks).extracting(SnapshotExpireTask::snapshotId).containsExactly(box(ids)); + assertThat(tasks).extracting(SnapshotExpireTask::taskType).containsExactly(types); + } + + @Test + public void testPartitionTasksWithLargeParallelism() { + // beginInclusiveId=1, endExclusiveId=4, parallelism=10 + List dataFileTasks = new ArrayList<>(); + for (long id = 2; id <= 4; id++) { + dataFileTasks.add(SnapshotExpireTask.forDataFiles(id)); + } + List changelogFileTasks = new ArrayList<>(); + for (long id = 1; id <= 3; id++) { + changelogFileTasks.add(SnapshotExpireTask.forChangelogFiles(id)); + } + + ExpireSnapshotsPlan plan = + new ExpireSnapshotsPlan( + dataFileTasks, + changelogFileTasks, + Collections.emptyList(), + Collections.emptyList(), + null, + 1, + 4); + + List> result = plan.partitionTasksBySnapshotRange(10); + // Parallelism is 10. Total snapshot range involved is [1, 4] (inclusive). + // ID 1: changelog only. ID 2,3: both. ID 4: data only (since data files are (begin, end]). + // Total 4 snapshots have tasks. + // snapshotsPerWorker = ceil(4 / 10) = 1. + // Workers 0, 1, 2, 3 should get tasks. + assertThat(result).hasSize(4); + + // Worker 0 (snapshot 1): [changelogTask(1)] + assertWorker(result.get(0), new long[] {1}, new TaskType[] {DELETE_CHANGELOG_FILES}); + + // Worker 1 (snapshot 2): [dataTask(2), changelogTask(2)] + assertWorker( + result.get(1), + new long[] {2, 2}, + new TaskType[] {DELETE_DATA_FILES, DELETE_CHANGELOG_FILES}); + + // Worker 2 (snapshot 3): [dataTask(3), changelogTask(3)] + assertWorker( + result.get(2), + new long[] {3, 3}, + new TaskType[] {DELETE_DATA_FILES, DELETE_CHANGELOG_FILES}); + + // Worker 3 (snapshot 4): [dataTask(4)] + assertWorker(result.get(3), new long[] {4}, new TaskType[] {DELETE_DATA_FILES}); + } + + @Test + public void testPartitionTasksWithSingleSnapshot() { + // beginInclusiveId=1, endExclusiveId=2, parallelism=2 + // Only snapshot 1 needs expire. + List dataFileTasks = + Collections.singletonList(SnapshotExpireTask.forDataFiles(2)); // usually id+1 + List changelogFileTasks = + Collections.singletonList(SnapshotExpireTask.forChangelogFiles(1)); + + ExpireSnapshotsPlan plan = + new ExpireSnapshotsPlan( + dataFileTasks, + changelogFileTasks, + Collections.emptyList(), + Collections.emptyList(), + null, + 1, + 2); + + List> result = plan.partitionTasksBySnapshotRange(2); + // total = 2-1 = 1? or +1? + // If range is [1, 2). Snapshots are {1}. Count = 1. + // snapshotsPerWorker = 1. + // Worker 0: [1, 1]. + // Worker 1: [2, 2]. > endExclusiveId? + + assertThat(result).hasSize(2); + assertWorker(result.get(0), new long[] {1}, new TaskType[] {DELETE_CHANGELOG_FILES}); + assertWorker(result.get(1), new long[] {2}, new TaskType[] {DELETE_DATA_FILES}); + } + + private Long[] box(long[] arr) { + Long[] result = new Long[arr.length]; + for (int i = 0; i < arr.length; i++) { + result[i] = arr[i]; + } + return result; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java index c6eab6dd19d5..da009ffe24a5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java @@ -75,7 +75,7 @@ public void testChangelogExpire() throws Exception { ExpireSnapshotsImpl expireSnapshots = (ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig); - expireSnapshots.expireUntil(1, 7); + expireUntil(expireSnapshots, 1, 7, true); assertThatCode(() -> expire.expireUntil(1, 6)).doesNotThrowAnyException(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java index d9e58e245ce7..5a22bbe378a7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java @@ -26,6 +26,8 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.operation.expire.ExpireSnapshotsPlan; +import org.apache.paimon.operation.expire.ExpireSnapshotsPlanner; import org.apache.paimon.options.Options; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; @@ -64,22 +66,22 @@ public void testIndexFileExpiration() throws Exception { long indexFileSize = indexFileSize(); long indexManifestSize = indexManifestSize(); - expire.expireUntil(1, 2); + expireUntil(expire, 1, 2, false); checkIndexFiles(2); assertThat(indexFileSize()).isEqualTo(indexFileSize - 1); assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 1); - expire.expireUntil(2, 3); + expireUntil(expire, 2, 3, false); checkIndexFiles(3); assertThat(indexFileSize()).isEqualTo(indexFileSize - 1); assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 1); - expire.expireUntil(3, 5); + expireUntil(expire, 3, 5, false); checkIndexFiles(5); assertThat(indexFileSize()).isEqualTo(indexFileSize - 2); assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 2); - expire.expireUntil(5, 7); + expireUntil(expire, 5, 7, false); checkIndexFiles(7); assertThat(indexFileSize()).isEqualTo(3); assertThat(indexManifestSize()).isEqualTo(1); @@ -96,12 +98,12 @@ public void testIndexFileExpirationWithTag() throws Exception { long indexFileSize = indexFileSize(); long indexManifestSize = indexManifestSize(); - expire.expireUntil(1, 5); + expireUntil(expire, 1, 5, false); checkIndexFiles(5); assertThat(indexFileSize()).isEqualTo(indexFileSize - 1); assertThat(indexManifestSize()).isEqualTo(indexManifestSize - 1); - expire.expireUntil(5, 7); + expireUntil(expire, 5, 7, false); checkIndexFiles(7); assertThat(indexFileSize()).isEqualTo(5); assertThat(indexManifestSize()).isEqualTo(3); @@ -130,7 +132,7 @@ public void testIndexFileExpirationWhenDeletingTag() throws Exception { // test delete tag after expiring snapshots table.createTag("tag3", 3); - expire.expireUntil(1, 7); + expireUntil(expire, 1, 7, false); table.deleteTag("tag3"); TagManager tagManager = new TagManager(LocalFileIO.create(), table.location()); @@ -243,4 +245,14 @@ private void write(StreamTableWrite write, Pair rowWithBuck throws Exception { write.write(rowWithBucket.getKey(), rowWithBucket.getValue()); } + + protected void expireUntil( + ExpireSnapshotsImpl expire, + long earliestId, + long endExclusiveId, + boolean changelogDecoupled) { + ExpireSnapshotsPlanner planner = ExpireSnapshotsPlanner.create(table); + ExpireSnapshotsPlan plan = planner.plan(earliestId, endExclusiveId, changelogDecoupled); + expire.executeWithPlan(plan); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java index 189597b4893a..76526f2ae0d4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java @@ -18,30 +18,75 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.expire.RangePartitionedExpireFunction; +import org.apache.paimon.flink.expire.SnapshotExpireSink; import org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure; +import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.operation.expire.DeletionReport; +import org.apache.paimon.operation.expire.ExpireSnapshotsPlan; +import org.apache.paimon.operation.expire.ExpireSnapshotsPlanner; +import org.apache.paimon.operation.expire.SnapshotExpireTask; +import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.ProcedureUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.List; import java.util.Map; -/** Expire snapshots action for Flink. */ +/** + * Expire snapshots action for Flink. + * + *

This action supports both serial and parallel execution modes based on parallelism: + * + *

    + *
  • Serial mode (parallelism is null or <= 1): Executes locally or starts a local Flink job + *
  • Parallel mode (parallelism > 1): Uses Flink distributed execution for deletion + *
+ * + *

In parallel mode: + * + *

    + *
  • Worker phase (map): deletes data files/changelog files in parallel + *
  • Sink phase (commit): deletes manifests and snapshot metadata serially to avoid concurrent + * deletion issues + *
+ */ public class ExpireSnapshotsAction extends ActionBase implements LocalAction { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class); + private final String database; private final String table; + private final Integer retainMax; private final Integer retainMin; private final String olderThan; private final Integer maxDeletes; private final String options; + private final Integer parallelism; public ExpireSnapshotsAction( String database, String table, Map catalogConfig, - Integer retainMax, - Integer retainMin, - String olderThan, - Integer maxDeletes, - String options) { + @Nullable Integer retainMax, + @Nullable Integer retainMin, + @Nullable String olderThan, + @Nullable Integer maxDeletes, + @Nullable String options, + @Nullable Integer parallelism) { super(catalogConfig); this.database = database; this.table = table; @@ -50,12 +95,168 @@ public ExpireSnapshotsAction( this.olderThan = olderThan; this.maxDeletes = maxDeletes; this.options = options; + this.parallelism = parallelism; } + /** Returns true if forceStartFlinkJob is enabled and parallelism is greater than 1. */ + private boolean isParallelMode() { + return forceStartFlinkJob && parallelism != null && parallelism > 1; + } + + @Override + public void run() throws Exception { + if (parallelism != null && parallelism > 1 && !forceStartFlinkJob) { + throw new IllegalArgumentException( + "Parallel expire mode requires both --parallelism > 1 and --force_start_flink_job enabled."); + } + if (isParallelMode()) { + // Parallel mode: build and execute Flink job (multi parallelism) + build(); + execute(this.getClass().getSimpleName()); + } else if (forceStartFlinkJob) { + // Serial mode but forced to run as Flink job (single parallelism) + super.run(); + } else { + // Serial mode: execute locally + executeLocally(); + } + } + + @Override public void executeLocally() throws Exception { ExpireSnapshotsProcedure expireSnapshotsProcedure = new ExpireSnapshotsProcedure(); expireSnapshotsProcedure.withCatalog(catalog); expireSnapshotsProcedure.call( null, database + "." + table, retainMax, retainMin, olderThan, maxDeletes, options); } + + @Override + public void build() throws Exception { + if (!isParallelMode()) { + // Not in parallel mode, nothing to build + return; + } + + // Prepare table and config using shared method + Pair prepared = + resolveExpireTableAndConfig( + catalog.getTable(Identifier.fromString(database + "." + table)), + options, + retainMax, + retainMin, + olderThan, + maxDeletes); + FileStoreTable fileStoreTable = prepared.getLeft(); + ExpireConfig expireConfig = prepared.getRight(); + + // Create planner using factory method + ExpireSnapshotsPlanner planner = ExpireSnapshotsPlanner.create(fileStoreTable); + + // Plan the expiration + ExpireSnapshotsPlan plan = planner.plan(expireConfig); + if (plan.isEmpty()) { + LOG.info("No snapshots to expire"); + return; + } + + LOG.info( + "Planning to expire {} snapshots, range=[{}, {})", + plan.endExclusiveId() - plan.beginInclusiveId(), + plan.beginInclusiveId(), + plan.endExclusiveId()); + + Identifier identifier = new Identifier(database, table); + + // Build worker phase + DataStream reports = buildWorkerPhase(plan, identifier); + + // Build sink phase + buildSinkPhase(reports, plan, identifier); + } + + /** + * Build the worker phase of the Flink job. + * + *

Workers process data file and changelog file deletion tasks in parallel. Tasks are + * partitioned by snapshot range to ensure: + * + *

    + *
  • Same snapshot range tasks are processed by the same worker + *
  • Within each worker, data files are deleted before changelog files + *
  • Cache locality is maximized (adjacent snapshots often share manifest files) + *
+ */ + private DataStream buildWorkerPhase( + ExpireSnapshotsPlan plan, Identifier identifier) { + // Partition by snapshot range: each worker gets a contiguous range of snapshots + // with dataFileTasks first, then changelogFileTasks + List> partitionedGroups = + plan.partitionTasksBySnapshotRange(parallelism); + + DataStreamSource> source = + env.fromCollection(partitionedGroups).setParallelism(1); + + return source.rebalance() + .flatMap( + new RangePartitionedExpireFunction( + catalogOptions.toMap(), + identifier, + plan.protectionSet().taggedSnapshots())) + // Use JavaTypeInfo to ensure proper Java serialization of DeletionReport, + // avoiding Kryo's FieldSerializer which cannot handle BinaryRow correctly. + // This approach is compatible with both Flink 1.x and 2.x. + .returns(new JavaTypeInfo<>(DeletionReport.class)) + .setParallelism(parallelism) + .name("RangePartitionedExpire"); + } + + /** + * Build the sink phase of the Flink job. + * + *

The sink collects deletion reports from workers, then serially deletes manifests and + * snapshot metadata files to avoid concurrent deletion issues. + */ + private void buildSinkPhase( + DataStream reports, ExpireSnapshotsPlan plan, Identifier identifier) { + reports.sinkTo( + new SnapshotExpireSink( + catalogOptions.toMap(), + identifier, + plan.endExclusiveId(), + plan.protectionSet().manifestSkippingSet(), + plan.manifestTasks(), + plan.snapshotFileTasks())) + .setParallelism(1) + .name("SnapshotExpire"); + } + + /** + * Prepares the table with dynamic options and builds the ExpireConfig. + * + * @param table the original table + * @param options dynamic options string + * @param retainMax maximum snapshots to retain + * @param retainMin minimum snapshots to retain + * @param olderThan expire snapshots older than this timestamp + * @param maxDeletes maximum number of snapshots to delete + * @return a pair of (FileStoreTable with dynamic options applied, ExpireConfig) + */ + private Pair resolveExpireTableAndConfig( + Table table, + String options, + Integer retainMax, + Integer retainMin, + String olderThan, + Integer maxDeletes) { + HashMap dynamicOptions = new HashMap<>(); + ProcedureUtils.putAllOptions(dynamicOptions, options); + FileStoreTable fileStoreTable = (FileStoreTable) table.copy(dynamicOptions); + + CoreOptions tableOptions = fileStoreTable.store().options(); + ExpireConfig expireConfig = + ProcedureUtils.fillInSnapshotOptions( + tableOptions, retainMax, retainMin, olderThan, maxDeletes) + .build(); + return Pair.of(fileStoreTable, expireConfig); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java index 91628c2e37f4..21da1bf421b2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java @@ -32,6 +32,7 @@ public class ExpireSnapshotsActionFactory implements ActionFactory { private static final String OLDER_THAN = "older_than"; private static final String MAX_DELETES = "max_deletes"; private static final String OPTIONS = "options"; + private static final String PARALLELISM = "parallelism"; @Override public String identifier() { @@ -48,6 +49,8 @@ public Optional create(MultipleParameterToolAdapter params) { Integer maxDeletes = params.has(MAX_DELETES) ? Integer.parseInt(params.get(MAX_DELETES)) : null; String options = params.has(OPTIONS) ? params.get(OPTIONS) : null; + Integer parallelism = + params.has(PARALLELISM) ? Integer.parseInt(params.get(PARALLELISM)) : null; ExpireSnapshotsAction action = new ExpireSnapshotsAction( @@ -58,8 +61,8 @@ public Optional create(MultipleParameterToolAdapter params) { retainMin, olderThan, maxDeletes, - options); - + options, + parallelism); return Optional.of(action); } @@ -77,6 +80,7 @@ public void printHelp() { + "--retain_max \\\n" + "--retain_min \\\n" + "--older_than \\\n" - + "--max_delete "); + + "--max_deletes \\\n" + + "[--parallelism ]"); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/RangePartitionedExpireFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/RangePartitionedExpireFunction.java new file mode 100644 index 000000000000..2f26206b4869 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/RangePartitionedExpireFunction.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.expire; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.operation.expire.DeletionReport; +import org.apache.paimon.operation.expire.ExpireSnapshotsExecutor; +import org.apache.paimon.operation.expire.SnapshotExpireTask; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; + +/** + * Flink flatMap function for range-partitioned snapshot expiration (worker phase). + * + *

This function processes a batch of {@link SnapshotExpireTask}s that belong to the same + * contiguous range. Each subtask receives a list of tasks (e.g., subtask 0 gets snap 1-4, subtask 1 + * gets snap 5-8) and processes them sequentially in order. + * + *

Processing tasks in order within each subtask maximizes cache locality since adjacent + * snapshots often share manifest files. + * + *

In worker phase, this function only deletes data files and changelog data files. Manifest and + * snapshot metadata deletion is deferred to the sink phase to avoid concurrent deletion issues. + * + *

This function uses {@link ExpireSnapshotsExecutor#execute} which loads tag data files + * on-demand with internal caching. + */ +public class RangePartitionedExpireFunction + extends RichFlatMapFunction, DeletionReport> { + + private static final long serialVersionUID = 1L; + + private final Map catalogConfig; + private final Identifier identifier; + private final List taggedSnapshots; + + private transient ExpireSnapshotsExecutor executor; + + public RangePartitionedExpireFunction( + Map catalogConfig, + Identifier identifier, + List taggedSnapshots) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + this.taggedSnapshots = taggedSnapshots; + } + + @Override + public void open(OpenContext openContext) throws Exception { + this.executor = initExecutor(); + } + + /** + * Initializes and returns the executor for processing expire tasks. Subclasses can override + * this method to provide a custom executor for testing without catalog access. + * + *

Default implementation creates executor from catalog using {@link #catalogConfig} and + * {@link #identifier}. + */ + protected ExpireSnapshotsExecutor initExecutor() throws Exception { + Options options = Options.fromMap(catalogConfig); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + return new ExpireSnapshotsExecutor( + table.snapshotManager(), table.store().newSnapshotDeletion()); + } + + @Override + public void flatMap(List tasks, Collector out) + throws Exception { + // Process tasks sequentially in order to maximize cache locality + for (SnapshotExpireTask task : tasks) { + DeletionReport report = processTask(task); + out.collect(report); + } + } + + private DeletionReport processTask(SnapshotExpireTask task) { + // Execute task (worker phase only deletes data files, skippingSet is null) + DeletionReport report = executor.execute(task, taggedSnapshots, null); + report.setDeletionBuckets(executor.drainDeletionBuckets()); + return report; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/SnapshotExpireSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/SnapshotExpireSink.java new file mode 100644 index 000000000000..3f9838618e6e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/SnapshotExpireSink.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.expire; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.operation.expire.DeletionReport; +import org.apache.paimon.operation.expire.ExpireSnapshotsExecutor; +import org.apache.paimon.operation.expire.SnapshotExpireTask; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.api.connector.sink2.InitContext; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Sink that collects deletion reports from parallel expire workers, aggregates the results, and + * performs the final commit operations using Sink V2 API. + * + *

In the sink phase (committer), this sink: + * + *

    + *
  • Collects all deletion reports from workers + *
  • Deletes manifest files serially in snapshot ID order (to avoid concurrent deletion issues) + *
  • Deletes snapshot metadata files + *
  • Commits changelogs (for changelogDecoupled mode) + *
  • Cleans empty directories + *
  • Updates earliest hint + *
+ */ +public class SnapshotExpireSink implements Sink { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(SnapshotExpireSink.class); + + private final Map catalogConfig; + private final Identifier identifier; + private final long endExclusiveId; + private final Set manifestSkippingSet; + private final List manifestTasks; + private final List snapshotFileTasks; + + public SnapshotExpireSink( + Map catalogConfig, + Identifier identifier, + long endExclusiveId, + Set manifestSkippingSet, + List manifestTasks, + List snapshotFileTasks) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + this.endExclusiveId = endExclusiveId; + this.manifestSkippingSet = manifestSkippingSet; + this.manifestTasks = manifestTasks; + this.snapshotFileTasks = snapshotFileTasks; + } + + /** + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + */ + public SinkWriter createWriter(InitContext context) throws IOException { + return new ExpireSinkWriter(initExecutor()); + } + + @Override + public SinkWriter createWriter(WriterInitContext context) throws IOException { + return new ExpireSinkWriter(initExecutor()); + } + + /** + * Initializes and returns the executor. Subclasses can override this method to provide a custom + * executor for testing. + */ + @VisibleForTesting + protected ExpireSnapshotsExecutor initExecutor() { + try { + Options options = Options.fromMap(catalogConfig); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + return new ExpireSnapshotsExecutor( + table.snapshotManager(), + table.store().newSnapshotDeletion(), + table.changelogManager()); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize executor", e); + } + } + + /** SinkWriter that collects reports and performs commit on flush. */ + private class ExpireSinkWriter implements SinkWriter { + + private final Map> globalDeletionBuckets = new HashMap<>(); + private final ExpireSnapshotsExecutor executor; + + ExpireSinkWriter(ExpireSnapshotsExecutor executor) { + this.executor = executor; + } + + @Override + public void write(DeletionReport report, Context context) { + if (!report.isSkipped()) { + report.deletionBuckets() + .forEach( + (partition, buckets) -> + globalDeletionBuckets + .computeIfAbsent(partition, k -> new HashSet<>()) + .addAll(buckets)); + } + } + + @Override + public void flush(boolean endOfInput) { + if (!endOfInput) { + return; + } + + LOG.info( + "Expire sink received: {} manifest tasks, {} snapshot tasks", + manifestTasks.size(), + snapshotFileTasks.size()); + + // 1. Clean empty directories + executor.cleanEmptyDirectories(globalDeletionBuckets); + + // 2. Execute manifest deletion tasks + if (manifestSkippingSet != null) { + Set skippingSet = new HashSet<>(manifestSkippingSet); + for (SnapshotExpireTask task : manifestTasks) { + executor.execute(task, null, skippingSet); + } + } + + // 3. Execute snapshot file deletion tasks + for (SnapshotExpireTask task : snapshotFileTasks) { + executor.execute(task, null, null); + } + + executor.writeEarliestHint(endExclusiveId); + } + + @Override + public void close() {} + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireSnapshotsActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireSnapshotsActionITCase.java new file mode 100644 index 000000000000..1495a7132f42 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireSnapshotsActionITCase.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.flink.expire.RangePartitionedExpireFunction; +import org.apache.paimon.flink.expire.SnapshotExpireSink; +import org.apache.paimon.flink.util.MiniClusterWithClientExtension; +import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.ExpireSnapshotsTest; +import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.operation.expire.DeletionReport; +import org.apache.paimon.operation.expire.ExpireSnapshotsExecutor; +import org.apache.paimon.operation.expire.ExpireSnapshotsPlan; +import org.apache.paimon.operation.expire.ExpireSnapshotsPlanner; +import org.apache.paimon.operation.expire.SnapshotExpireTask; +import org.apache.paimon.options.ExpireConfig; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +/** + * IT Case for parallel expire snapshots using Flink execution. This class extends {@link + * ExpireSnapshotsTest} and overrides {@link #doExpire} to test parallel execution mode with real + * Flink DataStream API. + * + *

This test validates that the Flink-based parallel expire logic produces the same results as + * the serial implementation. It extends the production {@link RangePartitionedExpireFunction} and + * {@link SnapshotExpireSink} classes, overriding {@code initExecutor} to inject test executors. + */ +public class ExpireSnapshotsActionITCase extends ExpireSnapshotsTest { + + private static final int TEST_PARALLELISM = 2; + + // Placeholder identifier for test (not actually used since we override initExecutor) + private static final Identifier TEST_IDENTIFIER = Identifier.create("default", "test_table"); + + @RegisterExtension + protected static final MiniClusterWithClientExtension MINI_CLUSTER_EXTENSION = + new MiniClusterWithClientExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(TEST_PARALLELISM) + .build()); + + // Static supplier for test executor (used by inner test classes) + private static volatile Supplier executorSupplier; + + @Override + protected int doExpire(ExpireConfig config) { + // 1. Create planner and generate plan + SnapshotDeletion snapshotDeletion = store.newSnapshotDeletion(); + ExpireSnapshotsPlanner planner = + new ExpireSnapshotsPlanner( + snapshotManager, + new ConsumerManager( + fileIO, new Path(tempDir.toUri()), snapshotManager.branch()), + snapshotDeletion, + store.newTagManager()); + ExpireSnapshotsPlan plan = planner.plan(config); + + if (plan.isEmpty()) { + return 0; + } + + // 2. Set up executor supplier for test subclasses + executorSupplier = + () -> + new ExpireSnapshotsExecutor( + snapshotManager, snapshotDeletion, changelogManager); + + try { + // 3. Create Flink execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(TEST_PARALLELISM); + + // 4. Build worker phase: partition tasks by snapshot range + List> partitionedGroups = + plan.partitionTasksBySnapshotRange(TEST_PARALLELISM); + + DataStreamSource> source = + env.fromCollection(partitionedGroups).setParallelism(1); + + // 5. Apply test worker function (extends production class, overrides initExecutor) + DataStream reports = + source.rebalance() + .flatMap( + new TestExpireFunction( + Collections.emptyMap(), + TEST_IDENTIFIER, + plan.protectionSet().taggedSnapshots())) + .returns(new JavaTypeInfo<>(DeletionReport.class)) + .setParallelism(TEST_PARALLELISM) + .name("RangePartitionedExpire"); + + // 6. Apply test sink (extends SnapshotExpireSink, overrides initExecutor) + reports.sinkTo( + new TestExpireSink( + Collections.emptyMap(), + TEST_IDENTIFIER, + plan.endExclusiveId(), + plan.protectionSet().manifestSkippingSet(), + plan.manifestTasks(), + plan.snapshotFileTasks())) + .setParallelism(1) + .name("SnapshotExpireSink"); + + // 7. Execute Flink job + env.execute("ExpireSnapshotsFlinkTest"); + + return plan.snapshotFileTasks().size(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + executorSupplier = null; + } + } + + // ==================== Test Subclasses ==================== + + /** + * Test subclass of {@link RangePartitionedExpireFunction} that overrides {@code initExecutor} + * to use the test executor supplier, completely bypassing catalog access. + */ + private static class TestExpireFunction extends RangePartitionedExpireFunction { + + private static final long serialVersionUID = 1L; + + public TestExpireFunction( + Map catalogConfig, + Identifier identifier, + List taggedSnapshots) { + super(catalogConfig, identifier, taggedSnapshots); + } + + @Override + protected ExpireSnapshotsExecutor initExecutor() { + // Use the static executor supplier from test setup, bypassing catalog + return executorSupplier.get(); + } + } + + /** + * Test subclass of {@link SnapshotExpireSink} that overrides {@code initExecutor} to use the + * test executor supplier, completely bypassing catalog access. + */ + private static class TestExpireSink extends SnapshotExpireSink { + + private static final long serialVersionUID = 1L; + + public TestExpireSink( + Map catalogConfig, + Identifier identifier, + long endExclusiveId, + Set manifestSkippingSet, + List manifestTasks, + List snapshotFileTasks) { + super( + catalogConfig, + identifier, + endExclusiveId, + manifestSkippingSet, + manifestTasks, + snapshotFileTasks); + } + + @Override + protected ExpireSnapshotsExecutor initExecutor() { + // Use the static executor supplier from test setup, bypassing catalog + return executorSupplier.get(); + } + } + + // ==================== Disabled Tests ==================== + // The following tests are disabled because they are not suitable for parallel Flink execution. + + @Test + @Disabled("Tests SnapshotDeletion directly, not the full expire flow") + @Override + public void testExpireExtraFiles() {} + + @Test + @Disabled("Tests SnapshotDeletion directly, not the full expire flow") + @Override + public void testExpireExtraFilesWithExternalPath() {} + + @Test + @Disabled( + "Tests concurrent expire scenario which has different semantics in Flink parallel mode") + @Override + public void testExpireEmptySnapshot() {} +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java index bfec37fb0c7c..5d5851ff6283 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java @@ -28,12 +28,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; import java.sql.Timestamp; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT Case for {@link ExpireSnapshotsProcedure}. */ public class ExpireSnapshotsProcedureITCase extends CatalogITCaseBase { @@ -81,8 +82,11 @@ public void testExpireSnapshotsProcedure() throws Exception { } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Exception { + @CsvSource( + value = {"false,", "true,", "true,1", "true,2", "false,2"}, + nullValues = "") + public void testExpireSnapshotsAction(boolean forceStartFlinkJob, Integer parallelism) + throws Exception { sql( "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT)" + " WITH ( 'num-sorted-run.compaction-trigger' = '9999'," @@ -93,6 +97,17 @@ public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Excepti streamExecutionEnvironmentBuilder().streamingMode().build(); SnapshotManager snapshotManager = table.snapshotManager(); + // parallelism > 1 without forceStartFlinkJob should throw exception + if (!forceStartFlinkJob && parallelism != null && parallelism > 1) { + assertThatThrownBy( + () -> + createExpireAction(forceStartFlinkJob, parallelism) + .withStreamExecutionEnvironment(env) + .run()) + .isInstanceOf(IllegalArgumentException.class); + return; + } + // initially prepare 6 snapshots, expected snapshots (1, 2, 3, 4, 5, 6) for (int i = 0; i < 6; ++i) { sql("INSERT INTO word_count VALUES ('" + String.valueOf(i) + "', " + i + ")"); @@ -100,77 +115,38 @@ public void testExpireSnapshotsAction(boolean forceStartFlinkJob) throws Excepti checkSnapshots(snapshotManager, 1, 6); // retain_max => 5, expected snapshots (2, 3, 4, 5, 6) - createAction( - ExpireSnapshotsAction.class, - "expire_snapshots", - "--warehouse", - path, - "--database", - "default", - "--table", - "word_count", - "--retain_max", - "5", - "--force_start_flink_job", - Boolean.toString(forceStartFlinkJob)) + createExpireAction(forceStartFlinkJob, parallelism, "--retain_max", "5") .withStreamExecutionEnvironment(env) .run(); checkSnapshots(snapshotManager, 2, 6); // older_than => timestamp of snapshot 6, max_deletes => 1, expected snapshots (3, 4, 5, 6) Timestamp ts6 = new Timestamp(snapshotManager.latestSnapshot().timeMillis()); - createAction( - ExpireSnapshotsAction.class, - "expire_snapshots", - "--warehouse", - path, - "--database", - "default", - "--table", - "word_count", + createExpireAction( + forceStartFlinkJob, + parallelism, "--older_than", ts6.toString(), "--max_deletes", - "1", - "--force_start_flink_job", - Boolean.toString(forceStartFlinkJob)) + "1") .withStreamExecutionEnvironment(env) .run(); checkSnapshots(snapshotManager, 3, 6); - createAction( - ExpireSnapshotsAction.class, - "expire_snapshots", - "--warehouse", - path, - "--database", - "default", - "--table", - "word_count", + // older_than => timestamp of snapshot 6, retain_min => 3, expected snapshots (4, 5, 6) + createExpireAction( + forceStartFlinkJob, + parallelism, "--older_than", ts6.toString(), "--retain_min", - "3", - "--force_start_flink_job", - Boolean.toString(forceStartFlinkJob)) + "3") .withStreamExecutionEnvironment(env) .run(); checkSnapshots(snapshotManager, 4, 6); // older_than => timestamp of snapshot 6, expected snapshots (6) - createAction( - ExpireSnapshotsAction.class, - "expire_snapshots", - "--warehouse", - path, - "--database", - "default", - "--table", - "word_count", - "--older_than", - ts6.toString(), - "--force_start_flink_job", - Boolean.toString(forceStartFlinkJob)) + createExpireAction(forceStartFlinkJob, parallelism, "--older_than", ts6.toString()) .withStreamExecutionEnvironment(env) .run(); checkSnapshots(snapshotManager, 6, 6); @@ -217,4 +193,26 @@ private T createAction(Class clazz, String... args) { .map(clazz::cast) .orElseThrow(() -> new RuntimeException("Failed to create action")); } + + private ExpireSnapshotsAction createExpireAction( + boolean forceStartFlinkJob, Integer parallelism, String... extraArgs) { + java.util.List args = new java.util.ArrayList<>(); + java.util.Collections.addAll( + args, + "expire_snapshots", + "--warehouse", + path, + "--database", + "default", + "--table", + "word_count", + "--force_start_flink_job", + Boolean.toString(forceStartFlinkJob)); + if (parallelism != null) { + args.add("--parallelism"); + args.add(String.valueOf(parallelism)); + } + java.util.Collections.addAll(args, extraArgs); + return createAction(ExpireSnapshotsAction.class, args.toArray(new String[0])); + } }