From 9b055e40f28ed57544bb061cf3c758bfad875702 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Tue, 5 May 2026 16:29:02 -0700 Subject: [PATCH] [ENG-41301] Fix V9 batcher dropping all batches when savepoint shares instant time with deltacommit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In Hudi V2 / table-version-9 layout a savepoint pinned to an instant time T produces five files at the same T: .deltacommit.inflight .deltacommit.requested .savepoint.inflight _.deltacommit _.savepoint After lex-sort, .savepoint.inflight lands at position 3 of the same-T deltacommit's group, breaking the (inflight, requested, completed) triplet check in createBatches: the third file is a savepoint inflight, so states become {inflight, requested, inflight} instead of the expected superset of {inflight, requested, completed}. With the default upload strategy this sets shouldStopIteration=true at the very first group, returning zero batches and silently halting upload of the entire active timeline. The extractor logs only the INFO "Could not create batches with completed commits" each cycle, so the table never advances. Fix: pre-extract complete savepoint pairs (one inflight + one completed at the same T) into their own 2-file batches before the triplet loop runs, restoring the deltacommit/commit triplet to a clean three-file group at that T. Partial savepoints (single file) stay in the main flow so the existing single-savepoint handling can still process them. Savepoint batches are appended after commit batches; savepoints carry no write metadata so this ordering is benign. Reproduction: any V9 MoR table that runs daily savepoints (e.g. the Concentric perf-test workload). After this fix LakeView resumes uploading post-savepoint instants — including any compaction the customer triggers once log files accumulate. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ActiveTimelineInstantBatcher.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java index 41b8e4e..5142a96 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java @@ -17,7 +17,9 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -59,6 +61,16 @@ public Pair>> createBatches( sortedInstants = sortAndFilterInstants(instants); } + // V9/V2 layout: a savepoint pinned to an instant time T produces files + // .savepoint.inflight + _.savepoint that lex-sort to + // positions 3 and 5 of the same-T deltacommit's 5-file group, breaking the + // (inflight, requested, completed) triplet check. Peel complete savepoint + // pairs into their own 2-file batches before triplet matching so the + // deltacommit/commit at the same T can group cleanly. + Pair, List>> partitioned = extractSavepointPairs(sortedInstants); + sortedInstants = partitioned.getLeft(); + List> savepointBatches = partitioned.getRight(); + List> batches = new ArrayList<>(); List currentBatch = new ArrayList<>(); String firstIncompleteCheckpoint = checkpoint.getFirstIncompleteCommitFile(); @@ -180,9 +192,54 @@ public Pair>> createBatches( batches.add(currentBatch); } + // Append complete savepoint pairs after commit batches. Savepoints carry no + // write metadata so order vs commits is benign; placing them last keeps + // commit ordering deterministic. + batches.addAll(savepointBatches); + return Pair.of(firstIncompleteCheckpoint, batches); } + /** + * Splits {@code sortedInstants} into (a) non-savepoint files retained in their original sort + * order and (b) zero or more 2-file batches each containing one fully-paired savepoint + * ({@code .savepoint.inflight} + {@code _.savepoint}). Partial savepoint + * files (only inflight or only completed) stay in the non-savepoint list so the existing + * single-savepoint code path can still process them. + */ + private Pair, List>> extractSavepointPairs(List sortedInstants) { + Map> savepointFilesByTimestamp = new LinkedHashMap<>(); + List nonSavepoint = new ArrayList<>(); + for (File f : sortedInstants) { + ActiveTimelineInstant inst = getActiveTimeLineInstant(f.getFilename()); + if (SAVEPOINT_ACTION.equals(inst.getAction())) { + savepointFilesByTimestamp + .computeIfAbsent(inst.getTimestamp(), k -> new ArrayList<>()) + .add(f); + } else { + nonSavepoint.add(f); + } + } + List> savepointBatches = new ArrayList<>(); + for (List spFiles : savepointFilesByTimestamp.values()) { + boolean hasInflight = + spFiles.stream() + .anyMatch( + f -> "inflight".equals(getActiveTimeLineInstant(f.getFilename()).getState())); + boolean hasCompleted = + spFiles.stream() + .anyMatch( + f -> "completed".equals(getActiveTimeLineInstant(f.getFilename()).getState())); + if (spFiles.size() == 2 && hasInflight && hasCompleted) { + savepointBatches.add(spFiles); + } else { + nonSavepoint.addAll(spFiles); + } + } + nonSavepoint.sort(getFileComparator()); + return Pair.of(nonSavepoint, savepointBatches); + } + private static String getFirstIncompleteCheckpoint(String numericString) { BigInteger number = new BigInteger(numericString); BigInteger decrementedNumber = number.subtract(BigInteger.ONE);