Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2098,9 +2098,25 @@ public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient dataMe
// For any rollbacks and restores, we cannot neglect the instants that they are rolling back.
// The rollback instant should be more recent than the start of the timeline for it to have rolled back any
// instant which we have a log block for.
//
// Only read rollback metadata for rollbacks newer than the latest MDT compaction.
// After compaction, rolled-back log blocks are already merged into base files, so pre-compaction
// rollback timestamps are no longer needed for log block filtering. This avoids sequential storage
// reads for old rollback instants that can cause long latency during metadata table reading.
final String earliestInstantTime = validInstantTimestamps.isEmpty() ? SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
final String latestMdtCompactionTime = metadataMetaClient.getActiveTimeline()
.getCommitTimeline()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The existing HoodieBackedTableMetadata.getLatestCompactionTime() (line 808) uses getCommitAndReplaceTimeline() which also includes REPLACE_COMMIT_ACTION / CLUSTERING_ACTION, while this new code uses only getCommitTimeline() (just COMMIT_ACTION). Was this intentional? It's safe today since MDT only emits compaction commits as COMMIT_ACTION, but the inconsistency is a small future-proofing risk if MDT ever gains clustering/replace semantics — and reusing/sharing the existing helper would also avoid the duplicated lookup logic.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Member

@voonhous voonhous May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building ontop of this, if anything else ever writes a COMMIT_ACTION to MDT, this would silently treat that timestamp as a "compaction." It's worth being defensive, consider filtering explicitly on the compaction action.

IIRC, COMMIT_ACTION writes to MDT are exclusively generated by compaction, so this is safe for now.

The only problem that may arise in the future is if there's a change in contract API, and this becomes a regression.

As of now, i don't think this should be a blocker, just want to highlight this.

Feel free to create an issue to track this or if it's worth the effort to clean things up for new users to pick up.

.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::requestedTime)
.orElse(SOLO_COMMIT_TIMESTAMP);
// Only read rollback metadata for rollbacks newer than the later of:
// (a) the earliest completed instant (original filter), and
// (b) the latest MDT compaction instant (new optimization)
final String rollbackFilterThreshold = compareTimestamps(latestMdtCompactionTime,
GREATER_THAN, earliestInstantTime) ? latestMdtCompactionTime : earliestInstantTime;
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream()
.filter(instant -> compareTimestamps(instant.requestedTime(), GREATER_THAN, earliestInstantTime))
.filter(instant -> compareTimestamps(instant.requestedTime(), GREATER_THAN, rollbackFilterThreshold))
.forEach(instant -> validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline, dataMetaClient.getInstantGenerator())));

// add restore and rollback instants from MDT.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.hudi.metadata;

import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
Expand All @@ -28,6 +31,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
Expand All @@ -36,10 +40,13 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
Expand All @@ -61,8 +68,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -869,4 +878,89 @@ private static Stream<Arguments> mapKeyNoSeparatorToFileGroupIndexTestCases() {
)
);
}

/**
* Tests getValidInstantTimestamps rollback handling:
* - Without MDT compaction, all rollback metadata is read (rolled-back commits appear in valid timestamps).
* - With MDT compaction, only post-compaction rollback metadata is read (pre-compaction rollbacks are skipped
* because those log blocks are already merged into base files).
*/
@Test
void testGetValidInstantTimestampsSkipsPreCompactionRollbacks() throws Exception {
HoodieTestTable testTable = HoodieTestTable.of(metaClient);

String commit1 = "20260101010101000";
String commit2 = "20260201010101000";
String commit3 = "20260301010101000";
String commit4 = "20260501010101000";
String commit5 = "20260601010101000";
testTable.addCommit(commit1);
testTable.addCommit(commit2);
testTable.addCommit(commit3);
testTable.addCommit(commit4);
testTable.addCommit(commit5);

// Rollbacks before MDT compaction time
addCompletedRollback(testTable, "20260202010101000", commit2);
addCompletedRollback(testTable, "20260302010101000", commit3);
// Rollback after MDT compaction time
addCompletedRollback(testTable, "20260502010101000", commit4);

// Delete rolled-back commit instants from the timeline to simulate real rollback behavior.
// In a real system, the commit instant file is removed when a rollback completes, so the
// only way these timestamps appear in validInstantTimestamps is via rollback metadata reading.
metaClient = HoodieTableMetaClient.reload(metaClient);
for (String rolledBack : Arrays.asList(commit2, commit3, commit4)) {
HoodieInstant completedCommit = metaClient.getInstantGenerator()
.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, rolledBack);
metaClient.getActiveTimeline().deleteInstantFileIfExists(completedCommit);
}

// Create MDT metaClient with NO compaction initially (only delta commits)
String mdtBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
HoodieTestUtils.init(mdtBasePath, HoodieTableType.MERGE_ON_READ);
HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
.setBasePath(mdtBasePath)
.build();
HoodieTestTable mdtTestTable = HoodieTestTable.of(mdtMetaClient);
mdtTestTable.addDeltaCommit("20260101020101000");

metaClient = HoodieTableMetaClient.reload(metaClient);
mdtMetaClient = HoodieTableMetaClient.reload(mdtMetaClient);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the comment is cut off mid-sentence — "rolled-back commits appear" appears to be missing the end of the thought. Could you complete it, e.g. "rolled-back commits appear in the valid timestamps"?

- AI-generated; verify before applying. React 👍/👎 to flag quality.


// Without MDT compaction, all rollback metadata is read — rolled-back commits appear
Comment thread
yihua marked this conversation as resolved.
Set<String> validTimestamps = HoodieTableMetadataUtil.getValidInstantTimestamps(metaClient, mdtMetaClient);
assertTrue(validTimestamps.contains(commit1), "commit1 should be in valid timestamps");
assertTrue(validTimestamps.contains(commit2), "commit2 should be in valid timestamps (from rollback metadata read)");
assertTrue(validTimestamps.contains(commit3), "commit3 should be in valid timestamps (from rollback metadata read)");
assertTrue(validTimestamps.contains(commit4), "commit4 should be in valid timestamps (from rollback metadata read)");
assertTrue(validTimestamps.contains(commit5), "commit5 should be in valid timestamps");

// Now add a compaction commit to MDT at a time between rollback2 and rollback3
mdtTestTable.addCommit("20260401010101000");
mdtTestTable.addDeltaCommit("20260501020101000");

metaClient = HoodieTableMetaClient.reload(metaClient);
mdtMetaClient = HoodieTableMetaClient.reload(mdtMetaClient);

// With MDT compaction, only post-compaction rollback (commit4) metadata is read;
// pre-compaction rollbacks for commit2 and commit3 are skipped
validTimestamps = HoodieTableMetadataUtil.getValidInstantTimestamps(metaClient, mdtMetaClient);
assertTrue(validTimestamps.contains(commit1), "commit1 should be in valid timestamps");
assertTrue(validTimestamps.contains(commit5), "commit5 should be in valid timestamps");
assertTrue(validTimestamps.contains(commit4), "commit4 should be in valid timestamps (from post-compaction rollback)");
assertFalse(validTimestamps.contains(commit2), "commit2 should NOT be in valid timestamps (pre-compaction rollback skipped)");
assertFalse(validTimestamps.contains(commit3), "commit3 should NOT be in valid timestamps (pre-compaction rollback skipped)");
}

private void addCompletedRollback(HoodieTestTable testTable, String rollbackTime, String rolledBackCommit) throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: emptyPartitionFiles reads as though the map is empty, but it actually contains a partition1 entry. Something like partitionFiles or partitionToFiles might be less surprising.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Map<String, List<String>> emptyPartitionFiles = new HashMap<>();
emptyPartitionFiles.put("partition1", Collections.emptyList());
HoodieRollbackMetadata rollbackMeta = testTable.getRollbackMetadata(rolledBackCommit, emptyPartitionFiles, false);
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
rollbackPlan.setInstantToRollback(new HoodieInstantInfo(rolledBackCommit, HoodieTimeline.COMMIT_ACTION));
rollbackPlan.setRollbackRequests(Collections.emptyList());
testTable.addRollback(rollbackTime, rollbackMeta, rollbackPlan);
testTable.addRollbackCompleted(rollbackTime, rollbackMeta, false);
}
}
Loading