Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) {
return false;
}

long minRetentionThreshold = store.getLatestVersionPromoteToCurrentTimestamp() + minBackupVersionCleanupDelay;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more details in the PR description on exactly what scenario the patch is trying to fix and add the corresponding unit test for it? e.g. is the scenario that we are trying to solve:
t1: [(v1, current), (v2, failed), (v3, future)]
t2: [(v1, backup), (v2, failed), (v3, current)]

At t1 will be allowed to delete both v1 and v2 regardless of the minimum retention since getLatestVersionPromoteToCurrentTimestamp?

Regardless, I think this approach of only relying on getLatestVersionPromoteToCurrentTimestamp is error prone given the various scenarios a store's version list can get into with deferred version swap. I'd recommend implementing a cleaner solution like to what we discussed where with current version counter/generationId as a version config to help the cleanup task cherry pick the right version to delete (or can ask ClaudeCode to see if it has better ideas).

long defaultRetentionThreshold =
store.getLatestVersionPromoteToCurrentTimestamp() + defaultBackupVersionRetentionMs;
boolean pastDefaultRetention = time.getMilliseconds() > defaultRetentionThreshold;
boolean pastMinRetention = time.getMilliseconds() > minRetentionThreshold;
// We should always wait min retention before any deletion.
if (!pastMinRetention) {
return false;
}

// First, consider any versions that can be deleted (invalid status: error or killed) and are not in use
List<Version> readyToBeRemovedVersions =
versions.stream().filter(v -> VersionStatus.canDelete(v.getStatus())).collect(Collectors.toList());
Expand All @@ -310,16 +320,8 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) {
if (readyToBeRemovedVersions.isEmpty()) {
int repushSourceVersion = store.getVersionOrThrow(currentVersion).getRepushSourceVersion();
boolean isCurrentVersionRepushed = repushSourceVersion > NON_EXISTING_VERSION;
long minRetentionThreshold = store.getLatestVersionPromoteToCurrentTimestamp() + minBackupVersionCleanupDelay;
long defaultRetentionThreshold =
store.getLatestVersionPromoteToCurrentTimestamp() + defaultBackupVersionRetentionMs;
boolean pastDefaultRetention = time.getMilliseconds() > defaultRetentionThreshold;
boolean pastMinRetention = time.getMilliseconds() > minRetentionThreshold;
HashSet<Integer> repushChainVersions = new HashSet<>(); // all versions repushed into the current version

if (!pastMinRetention) {
return false;
}
readyToBeRemovedVersions = versions.stream()
.sorted((v1, v2) -> Integer.compare(v2.getNumber(), v1.getNumber())) // sort in descending order
.filter(v -> {
Expand Down
Loading