Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hudi.client.timeline.versioning.v1;

import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.ArchivalMetrics;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -271,7 +273,31 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
Option<HoodieInstant> oldestInstantToRetainForClustering =
ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient(), config.getCleanerPolicy());

// If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive
// commits that have data files that haven't been cleaned yet.
Option<HoodieInstant> oldestInstantToRetainForClean = Option.empty();
if (config.shouldBlockArchivalOnCleanECTR()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does TimelineArchiverV2 need this as well, or is it properly handled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option<HoodieInstant> lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant();
Comment thread
nsivabalan marked this conversation as resolved.
if (lastCleanInstant.isPresent()) {
try {
HoodieCleanMetadata cleanMetadata =
table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get());
if (cleanMetadata.getEarliestCommitToRetain() != null
&& !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) {
Comment thread
nsivabalan marked this conversation as resolved.
oldestInstantToRetainForClean = commitTimeline.findInstantsAfterOrEquals(
cleanMetadata.getEarliestCommitToRetain()).firstInstant();
log.info("Blocking archival based on earliest commit to retain {} from last clean {}. Oldest to retain is {}",
cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime(), oldestInstantToRetainForClean.map(instant -> instant).orElse(null));
}
Comment thread
nsivabalan marked this conversation as resolved.
} catch (IOException e) {
Comment thread
nsivabalan marked this conversation as resolved.
Comment thread
nsivabalan marked this conversation as resolved.
log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e);
throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Failure message contradicts runtime behavior.

Line 292 and Line 293 say “skipping ECTR check,” but the code throws immediately, which aborts archival. Please align the message with actual behavior to avoid operator confusion.

Suggested text fix
-            log.warn("Failed to read clean metadata for {}, skipping ECTR check", lastCleanInstant.get(), e);
-            throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get() + ", skipping ECTR check", e);
+            log.warn("Failed to read clean metadata for {}, aborting archival due to ECTR guard", lastCleanInstant.get(), e);
+            throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get()
+                + ", aborting archival due to ECTR guard", e);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
}
} catch (IOException e) {
log.warn("Failed to read clean metadata for {}, aborting archival due to ECTR guard", lastCleanInstant.get(), e);
throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get()
", aborting archival due to ECTR guard", e);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java`
around lines 291 - 294, The catch block in TimelineArchiverV1 currently logs and
throws with a message that says "skipping ECTR check" which is inaccurate
because the code immediately throws and aborts archival; update both the
log.warn call and the HoodieIOException message in the catch(IOException e)
handler so the text reflects that the archival is being aborted (e.g., "aborting
archival due to failure reading clean metadata for <instant>") instead of saying
it will skip the ECTR check, leaving the exception chaining intact.

CodeRabbit (original) (source:comment#3055869924)

}
Comment thread
nsivabalan marked this conversation as resolved.
}

// Actually do the commits
Option<HoodieInstant> finalOldestInstantToRetainForClean = oldestInstantToRetainForClean;
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstantsAsStream()
.filter(s -> {
if (config.shouldArchiveBeyondSavepoint()) {
Comment thread
nsivabalan marked this conversation as resolved.
Expand All @@ -297,6 +323,10 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
oldestInstantToRetainForClustering.map(instantToRetain ->
compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime()))
.orElse(true)
).filter(s ->
Comment thread
nsivabalan marked this conversation as resolved.
finalOldestInstantToRetainForClean.map(instantToRetain ->
compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime()))
.orElse(true)
);
return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ public class HoodieArchivalConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Number of timeline manifest versions to retain.");

public static final ConfigProperty<Boolean> BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR = ConfigProperty
.key("hoodie.archive.block.on.latest.clean.ectr")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.2.0")
.withDocumentation("If enabled, archival will not archive commits beyond the Earliest Commit To Retain (ECTR) "
+ "from the last completed clean. ECTR represents the oldest commit whose data files are still needed by "
+ "the table and have not yet been cleaned up. Blocking archival at this point ensures that timeline metadata "
+ "is not removed for commits whose data files still exist on storage, preventing inconsistencies between "
+ "the timeline and the actual data.");

/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
Expand Down Expand Up @@ -205,6 +216,11 @@ public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
return this;
}

public Builder withBlockArchivalOnCleanECTR(boolean blockArchivalOnCleanECTR) {
archivalConfig.setValue(BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR, String.valueOf(blockArchivalOnCleanECTR));
return this;
}

public HoodieArchivalConfig build() {
archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
return archivalConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,10 @@ public int getCommitArchivalBatchSize() {
return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
}

public boolean shouldBlockArchivalOnCleanECTR() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
public boolean shouldBlockArchivalOnCleanECTR() {
public boolean shouldBlockArchivalOnCleanEarliestCommitToRetain() {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its very lengthy name. by now, ECTR is widely used in hudi (similar to cow and mor), so wanted to keep it that way

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan then it should use Camel case, i.e., Ectr.

return getBoolean(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR);
}

public Boolean shouldCleanBootstrapBaseFile() {
return getBoolean(HoodieCleanConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.timeline.TimelineArchivers;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1;
import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter;
import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.utils.ArchivalMetrics;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand All @@ -47,6 +52,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.LSMTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
Expand Down Expand Up @@ -103,6 +109,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -2189,4 +2196,227 @@ public void testArchivalMetricsWithMixedActionTypes() throws Exception {
// Verify archival status is success
assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_STATUS), "Archival should succeed");
}

private void initTableToTestECTRBlock() throws IOException {
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
initPath();
initSparkContexts();
initTimelineService();
Properties properties = new Properties();
properties.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "6");
properties.setProperty(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key(), "false");
initMetaClient(properties);
storage = metaClient.getStorage();
metaClient.getStorage().createDirectory(new StoragePath(basePath));
metaClient = HoodieTestUtils.init(storageConf, basePath, tableType, properties);
}

/**
Comment thread
nsivabalan marked this conversation as resolved.
* Tests archival behavior with ECTR blocking enabled vs disabled.
* When enabled: commits >= ECTR from last clean are not archived.
* When disabled: archival proceeds normally ignoring ECTR (backward compatible).
* Also validates that archival makes progress when ECTR is later than the archival window.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testArchivalWithECTRBlocking(boolean blockArchivalOnCleanECTR) throws Exception {
initTableToTestECTRBlock();

HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, blockArchivalOnCleanECTR);
HoodieTestTable testTable = HoodieTestTable.of(metaClient);

// Given: 5 commits and a clean commit with ECTR pointing to commit 00000003
for (int i = 1; i <= 5; i++) {
testTable.addCommit(String.format("%08d", i));
}
addCleanCommitWithECTR(testTable, "00000006", "00000003", "00000005");
metaClient = HoodieTableMetaClient.reload(metaClient);

// When: trigger archival
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table);
archiver.archiveIfRequired(context);

// Then: verify archival behavior
metaClient = HoodieTableMetaClient.reload(metaClient);
List<String> activeCommitTimes = getActiveCommitTimes();

if (blockArchivalOnCleanECTR) {
// Commits >= ECTR should not be archived
assertTrue(activeCommitTimes.contains("00000003"), "Commit 00000003 (ECTR) should not be archived");
assertTrue(activeCommitTimes.contains("00000004"), "Commit 00000004 (after ECTR) should not be archived");
assertTrue(activeCommitTimes.contains("00000005"), "Commit 00000005 (after ECTR) should not be archived");
} else {
// ECTR is ignored; archival proceeds based on min/max archival commits only
assertFalse(activeCommitTimes.contains("00000003"),
"Commit 00000003 (ECTR) should be archived when ECTR blocking is disabled");
assertTrue(activeCommitTimes.contains("00000005"),
"Commit 00000005 should be retained (within min commits to keep)");
}

if (blockArchivalOnCleanECTR) {
// Additional step: validate archival makes progress when ECTR is later than the archival window.
// Add more commits and a new clean with ECTR at 00000008, so commits before ECTR can be archived.
for (int i = 7; i <= 10; i++) {
testTable.addCommit(String.format("%08d", i));
}
addCleanCommitWithECTR(testTable, "00000011", "00000008", "00000010");
metaClient = HoodieTableMetaClient.reload(metaClient);

table = HoodieSparkTable.create(writeConfig, context, metaClient);
archiver = new TimelineArchiverV1(writeConfig, table);
archiver.archiveIfRequired(context);

metaClient = HoodieTableMetaClient.reload(metaClient);
activeCommitTimes = getActiveCommitTimes();

// Commits before ECTR should be archived
for (int i = 1; i <= 7; i++) {
assertFalse(activeCommitTimes.contains(String.format("%08d", i)),
"Commit " + String.format("%08d", i) + " (before ECTR) should be archived");
}
// Commits >= ECTR should be retained
assertTrue(activeCommitTimes.contains("00000008"), "Commit 00000008 (ECTR) should not be archived");
assertTrue(activeCommitTimes.contains("00000009"), "Commit 00000009 (after ECTR) should not be archived");
assertTrue(activeCommitTimes.contains("00000010"), "Commit 00000010 (after ECTR) should not be archived");
assertEquals(3, activeCommitTimes.size(), "Exactly 3 commits (00000008-00000010) should remain active");
}
}

/**
* Tests graceful handling when clean metadata is missing or has empty ECTR.
* Archival should continue normally in both cases.
*/
@Test
public void testArchivalContinuesWhenECTRIsAbsent() throws Exception {
initTableToTestECTRBlock();

HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, true);
HoodieTestTable testTable = HoodieTestTable.of(metaClient);

// Step 1: No clean commit exists — archival should proceed without error
for (int i = 1; i <= 6; i++) {
testTable.addCommit(String.format("%08d", i));
}
metaClient = HoodieTableMetaClient.reload(metaClient);

HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
TimelineArchiverV1 archiver = new TimelineArchiverV1(writeConfig, table);

TimelineArchiverV1 finalArchiver = archiver;
assertDoesNotThrow(() -> finalArchiver.archiveIfRequired(context),
"Archival should continue gracefully when clean metadata is missing");

metaClient = HoodieTableMetaClient.reload(metaClient);
int commitsAfterFirstArchival = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().countInstants();
assertTrue(commitsAfterFirstArchival <= 3, "Archival should proceed when clean metadata is missing");

// Step 2: Clean commit exists but with empty ECTR — archival should still proceed
for (int i = 7; i <= 12; i++) {
testTable.addCommit(String.format("%08d", i));
}
addCleanCommitWithECTR(testTable, "00000013", "", "00000012");
metaClient = HoodieTableMetaClient.reload(metaClient);

table = HoodieSparkTable.create(writeConfig, context, metaClient);
archiver = new TimelineArchiverV1(writeConfig, table);

TimelineArchiverV1 finalArchiver1 = archiver;
assertDoesNotThrow(() -> finalArchiver1.archiveIfRequired(context),
"Archival should handle empty ECTR gracefully");

metaClient = HoodieTableMetaClient.reload(metaClient);
int commitsAfterSecondArchival = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().countInstants();
assertTrue(commitsAfterSecondArchival <= 3, "Archival should proceed normally with empty ECTR");
}

private HoodieWriteConfig buildECTRTestConfig(int minCommits, int maxCommits, boolean blockArchivalOnCleanECTR) {
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(minCommits, maxCommits)
.withBlockArchivalOnCleanECTR(blockArchivalOnCleanECTR)
.build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.retainCommits(1)
.build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(false)
.build())
.forTable("test-trip-table")
.build();
}

private void addCleanCommitWithECTR(HoodieTestTable testTable, String cleanInstant, String ectr, String lastCompleted) throws Exception {
List<HoodieCleanStat> cleanStatsList = new ArrayList<>();
cleanStatsList.add(new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
"p1",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
ectr,
lastCompleted
));

HoodieCleanMetadata cleanMetadata =
CleanerUtils.convertCleanMetadata(cleanInstant, Option.of(0L), cleanStatsList, Collections.emptyMap());
HoodieCleanerPlan cleanerPlan =
new HoodieCleanerPlan(
new HoodieActionInstant(cleanInstant, CLEAN_ACTION, ""),
"", "", new HashMap<>(), CleanMetadataV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), Collections.emptyMap());

testTable.addClean(cleanInstant, cleanerPlan, cleanMetadata);
}

private List<String> getActiveCommitTimes() {
return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().stream()
.map(HoodieInstant::requestedTime)
.collect(Collectors.toList());
}

/**
* Tests that TimelineArchiverV2 (LSM-based timeline, v9 tables) does NOT block archival on ECTR.
* ECTR blocking is only for v6 tables using TimelineArchiverV1.
*/
@Test
public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you modify testArchiveTableWithCleanCommits so to keep on test around archival + clean on table version 9?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just 1 test where we validate that v9 does not block on ECTR.
do not want to couple w/ other tests as we are not looking to expand any more combinations for v9.

init();

HoodieWriteConfig writeConfig = buildECTRTestConfig(2, 3, true);
HoodieTestTable testTable = HoodieTestTable.of(metaClient);

// Given: 5 commits and a clean commit with ECTR at 00000003
for (int i = 1; i <= 5; i++) {
testTable.addCommit(String.format("%08d", i));
}
addCleanCommitWithECTR(testTable, "00000006", "00000003", "00000005");
metaClient = HoodieTableMetaClient.reload(metaClient);

assertEquals(HoodieTableVersion.NINE, metaClient.getTableConfig().getTableVersion(),
"Table should be version 9");

// When: trigger archival using TimelineArchiverV2
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table);
archiver.archiveIfRequired(context);

// Then: TimelineArchiverV2 should NOT respect ECTR — commit 00000003 gets archived
metaClient = HoodieTableMetaClient.reload(metaClient);
List<String> activeCommitTimes = getActiveCommitTimes();

assertFalse(activeCommitTimes.contains("00000003"),
"TimelineArchiverV2: Commit 00000003 (ECTR) should be archived");
assertTrue(activeCommitTimes.contains("00000004"),
"TimelineArchiverV2: Commit 00000004 (after ECTR) should not be archived");
assertTrue(activeCommitTimes.contains("00000005"),
"TimelineArchiverV2: Commit 00000005 (after ECTR) should not be archived");
}
}
Loading