diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java index 41b8e4e..5142a96 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java @@ -17,7 +17,9 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -59,6 +61,16 @@ public Pair>> createBatches( sortedInstants = sortAndFilterInstants(instants); } + // V9/V2 layout: a savepoint pinned to an instant time T produces files + // .savepoint.inflight + _.savepoint that lex-sort to + // positions 3 and 5 of the same-T deltacommit's 5-file group, breaking the + // (inflight, requested, completed) triplet check. Peel complete savepoint + // pairs into their own 2-file batches before triplet matching so the + // deltacommit/commit at the same T can group cleanly. + Pair, List>> partitioned = extractSavepointPairs(sortedInstants); + sortedInstants = partitioned.getLeft(); + List> savepointBatches = partitioned.getRight(); + List> batches = new ArrayList<>(); List currentBatch = new ArrayList<>(); String firstIncompleteCheckpoint = checkpoint.getFirstIncompleteCommitFile(); @@ -180,9 +192,54 @@ public Pair>> createBatches( batches.add(currentBatch); } + // Append complete savepoint pairs after commit batches. Savepoints carry no + // write metadata so order vs commits is benign; placing them last keeps + // commit ordering deterministic. + batches.addAll(savepointBatches); + return Pair.of(firstIncompleteCheckpoint, batches); } + /** + * Splits {@code sortedInstants} into (a) non-savepoint files retained in their original sort + * order and (b) zero or more 2-file batches each containing one fully-paired savepoint + * ({@code .savepoint.inflight} + {@code _.savepoint}). Partial savepoint + * files (only inflight or only completed) stay in the non-savepoint list so the existing + * single-savepoint code path can still process them. + */ + private Pair, List>> extractSavepointPairs(List sortedInstants) { + Map> savepointFilesByTimestamp = new LinkedHashMap<>(); + List nonSavepoint = new ArrayList<>(); + for (File f : sortedInstants) { + ActiveTimelineInstant inst = getActiveTimeLineInstant(f.getFilename()); + if (SAVEPOINT_ACTION.equals(inst.getAction())) { + savepointFilesByTimestamp + .computeIfAbsent(inst.getTimestamp(), k -> new ArrayList<>()) + .add(f); + } else { + nonSavepoint.add(f); + } + } + List> savepointBatches = new ArrayList<>(); + for (List spFiles : savepointFilesByTimestamp.values()) { + boolean hasInflight = + spFiles.stream() + .anyMatch( + f -> "inflight".equals(getActiveTimeLineInstant(f.getFilename()).getState())); + boolean hasCompleted = + spFiles.stream() + .anyMatch( + f -> "completed".equals(getActiveTimeLineInstant(f.getFilename()).getState())); + if (spFiles.size() == 2 && hasInflight && hasCompleted) { + savepointBatches.add(spFiles); + } else { + nonSavepoint.addAll(spFiles); + } + } + nonSavepoint.sort(getFileComparator()); + return Pair.of(nonSavepoint, savepointBatches); + } + private static String getFirstIncompleteCheckpoint(String numericString) { BigInteger number = new BigInteger(numericString); BigInteger decrementedNumber = number.subtract(BigInteger.ONE);