Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/maintenance/manage-snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ Run the following command:
--max_deletes <max-deletes> \
--retain_max <retain-max> \
--retain_min <retain-min> \
[--parallelism <parallelism>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,29 @@ public void cleanEmptyDirectories() {
if (!cleanEmptyDirectories || deletionBuckets.isEmpty()) {
return;
}
doCleanEmptyDirectories(deletionBuckets);
deletionBuckets.clear();
}

/**
* Try to delete data directories that may be empty after data file deletion.
*
* <p>This method uses externally provided buckets instead of internal deletionBuckets. Used in
* parallel expire mode where buckets are aggregated from multiple workers.
*
* @param aggregatedBuckets merged deletion buckets from all workers
*/
public void cleanEmptyDirectories(Map<BinaryRow, Set<Integer>> aggregatedBuckets) {
if (!cleanEmptyDirectories || aggregatedBuckets.isEmpty()) {
return;
}
doCleanEmptyDirectories(aggregatedBuckets);
}

private void doCleanEmptyDirectories(Map<BinaryRow, Set<Integer>> buckets) {
// All directory paths are deduplicated and sorted by hierarchy level
Map<Integer, Set<Path>> deduplicate = new HashMap<>();
for (Map.Entry<BinaryRow, Set<Integer>> entry : deletionBuckets.entrySet()) {
for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
List<Path> toDeleteEmptyDirectory = new ArrayList<>();
// try to delete bucket directories
for (Integer bucket : entry.getValue()) {
Expand All @@ -162,8 +181,6 @@ public void cleanEmptyDirectories() {
for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0; hierarchy--) {
deduplicate.get(hierarchy).forEach(this::tryDeleteEmptyDirectory);
}

deletionBuckets.clear();
}

protected void recordDeletionBuckets(ExpireFileEntry entry) {
Expand All @@ -172,6 +189,23 @@ protected void recordDeletionBuckets(ExpireFileEntry entry) {
.add(entry.bucket());
}

/**
* Get and clear the deletion buckets.
*
* <p>This method is used in parallel expire to collect buckets for a single task without
* accumulating results from previous tasks processed by the same worker.
*
* @return a copy of the deletion buckets, the internal state is cleared after this call
*/
public Map<BinaryRow, Set<Integer>> drainDeletionBuckets() {
Map<BinaryRow, Set<Integer>> result = new HashMap<>();
for (Map.Entry<BinaryRow, Set<Integer>> entry : deletionBuckets.entrySet()) {
result.put(entry.getKey(), new HashSet<>(entry.getValue()));
}
deletionBuckets.clear();
return result;
}

public void cleanUnusedDataFiles(String manifestList, Predicate<ExpireFileEntry> skipper) {
// try read manifests
List<ManifestFileMeta> manifests = tryReadManifestList(manifestList);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation.expire;

import org.apache.paimon.data.BinaryRow;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/** Report of a single snapshot expiration task. */
public class DeletionReport implements Serializable {

private static final long serialVersionUID = 1L;

private final long snapshotId;

/** Whether this task was skipped (e.g., snapshot already deleted). */
private boolean skipped;

/** Whether data files were deleted. */
private boolean dataFilesDeleted;

/** Whether changelog data files were deleted. */
private boolean changelogDeleted;

/** Whether manifest files were deleted. */
private boolean manifestsDeleted;

/** Whether snapshot metadata file was deleted. */
private boolean snapshotDeleted;

/** Buckets that had files deleted (for empty directory cleanup in parallel phase). */
private Map<BinaryRow, Set<Integer>> deletionBuckets;

public DeletionReport(long snapshotId) {
this.snapshotId = snapshotId;
this.skipped = false;
this.dataFilesDeleted = false;
this.changelogDeleted = false;
this.manifestsDeleted = false;
this.snapshotDeleted = false;
this.deletionBuckets = new HashMap<>();
}

/**
* Create a skipped report for a snapshot that was already deleted.
*
* @param snapshotId the snapshot ID
* @return a skipped deletion report
*/
public static DeletionReport skipped(long snapshotId) {
DeletionReport report = new DeletionReport(snapshotId);
report.skipped = true;
return report;
}

public long snapshotId() {
return snapshotId;
}

public boolean isSkipped() {
return skipped;
}

public void setDataFilesDeleted(boolean dataFilesDeleted) {
this.dataFilesDeleted = dataFilesDeleted;
}

public void setChangelogDeleted(boolean changelogDeleted) {
this.changelogDeleted = changelogDeleted;
}

public void setManifestsDeleted(boolean manifestsDeleted) {
this.manifestsDeleted = manifestsDeleted;
}

public void setSnapshotDeleted(boolean snapshotDeleted) {
this.snapshotDeleted = snapshotDeleted;
}

public void setDeletionBuckets(Map<BinaryRow, Set<Integer>> deletionBuckets) {
this.deletionBuckets = deletionBuckets;
}

public Map<BinaryRow, Set<Integer>> deletionBuckets() {
return deletionBuckets;
}

@Override
public String toString() {
return "DeletionReport{"
+ "snapshotId="
+ snapshotId
+ ", skipped="
+ skipped
+ ", dataFilesDeleted="
+ dataFilesDeleted
+ ", changelogDeleted="
+ changelogDeleted
+ ", manifestsDeleted="
+ manifestsDeleted
+ ", snapshotDeleted="
+ snapshotDeleted
+ ", deletionBucketsCount="
+ deletionBuckets.size()
+ '}';
}
}
Loading