Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -59,6 +61,16 @@ public Pair<String, List<List<File>>> createBatches(
sortedInstants = sortAndFilterInstants(instants);
}

// V9/V2 layout: a savepoint pinned to an instant time T produces files
// <T>.savepoint.inflight + <T>_<completionTs>.savepoint that lex-sort to
// positions 3 and 5 of the same-T deltacommit's 5-file group, breaking the
// (inflight, requested, completed) triplet check. Peel complete savepoint
// pairs into their own 2-file batches before triplet matching so the
// deltacommit/commit at the same T can group cleanly.
Pair<List<File>, List<List<File>>> partitioned = extractSavepointPairs(sortedInstants);
sortedInstants = partitioned.getLeft();
List<List<File>> savepointBatches = partitioned.getRight();

List<List<File>> batches = new ArrayList<>();
List<File> currentBatch = new ArrayList<>();
String firstIncompleteCheckpoint = checkpoint.getFirstIncompleteCommitFile();
Expand Down Expand Up @@ -180,9 +192,54 @@ public Pair<String, List<List<File>>> createBatches(
batches.add(currentBatch);
}

// Append complete savepoint pairs after commit batches. Savepoints carry no
// write metadata so order vs commits is benign; placing them last keeps
// commit ordering deterministic.
batches.addAll(savepointBatches);

return Pair.of(firstIncompleteCheckpoint, batches);
}

/**
* Splits {@code sortedInstants} into (a) non-savepoint files retained in their original sort
* order and (b) zero or more 2-file batches each containing one fully-paired savepoint
* ({@code <ts>.savepoint.inflight} + {@code <ts>_<completionTs>.savepoint}). Partial savepoint
* files (only inflight or only completed) stay in the non-savepoint list so the existing
* single-savepoint code path can still process them.
*/
private Pair<List<File>, List<List<File>>> extractSavepointPairs(List<File> sortedInstants) {
Map<String, List<File>> savepointFilesByTimestamp = new LinkedHashMap<>();
List<File> nonSavepoint = new ArrayList<>();
for (File f : sortedInstants) {
ActiveTimelineInstant inst = getActiveTimeLineInstant(f.getFilename());
if (SAVEPOINT_ACTION.equals(inst.getAction())) {
savepointFilesByTimestamp
.computeIfAbsent(inst.getTimestamp(), k -> new ArrayList<>())
.add(f);
} else {
nonSavepoint.add(f);
}
}
List<List<File>> savepointBatches = new ArrayList<>();
for (List<File> spFiles : savepointFilesByTimestamp.values()) {
boolean hasInflight =
spFiles.stream()
.anyMatch(
f -> "inflight".equals(getActiveTimeLineInstant(f.getFilename()).getState()));
boolean hasCompleted =
spFiles.stream()
.anyMatch(
f -> "completed".equals(getActiveTimeLineInstant(f.getFilename()).getState()));
if (spFiles.size() == 2 && hasInflight && hasCompleted) {
savepointBatches.add(spFiles);
} else {
nonSavepoint.addAll(spFiles);
}
}
nonSavepoint.sort(getFileComparator());
return Pair.of(nonSavepoint, savepointBatches);
}

private static String getFirstIncompleteCheckpoint(String numericString) {
BigInteger number = new BigInteger(numericString);
BigInteger decrementedNumber = number.subtract(BigInteger.ONE);
Expand Down
Loading