Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -271,7 +272,31 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
Option<HoodieInstant> oldestInstantToRetainForClustering =
ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient(), config.getCleanerPolicy());

// If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive
// commits that have data files that haven't been cleaned yet.
Option<HoodieInstant> oldestInstantToRetainForClean = Option.empty();
if (config.shouldBlockArchivalOnCleanECTR()) {
Option<HoodieInstant> lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastCleanInstant.isPresent()) {
try {
org.apache.hudi.avro.model.HoodieCleanMetadata cleanMetadata =
table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get());
if (cleanMetadata.getEarliestCommitToRetain() != null
&& !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) {
oldestInstantToRetainForClean = commitTimeline.findInstantsAfterOrEquals(
cleanMetadata.getEarliestCommitToRetain()).firstInstant();
log.info("Blocking archival based on ECTR {} from last clean {}",
cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime());
}
} catch (IOException e) {
log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e);
throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e);
}
}
}

// Actually do the commits
Option<HoodieInstant> finalOldestInstantToRetainForClean = oldestInstantToRetainForClean;
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstantsAsStream()
.filter(s -> {
if (config.shouldArchiveBeyondSavepoint()) {
Expand All @@ -297,6 +322,10 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
oldestInstantToRetainForClustering.map(instantToRetain ->
compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime()))
.orElse(true)
).filter(s ->
finalOldestInstantToRetainForClean.map(instantToRetain ->
compareTimestamps(s.requestedTime(), LESSER_THAN, instantToRetain.requestedTime()))
.orElse(true)
);
return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ public class HoodieArchivalConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Number of timeline manifest versions to retain.");

public static final ConfigProperty<Boolean> BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR = ConfigProperty
.key("hoodie.archive.block.on.latest.clean.ectr")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.2.0")
.withDocumentation("If enabled, archival will block on latest ECTR from last known clean");

/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
Expand Down Expand Up @@ -205,6 +212,11 @@ public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
return this;
}

public Builder withBlockArchivalOnCleanECTR(boolean blockArchivalOnCleanECTR) {
archivalConfig.setValue(BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR, String.valueOf(blockArchivalOnCleanECTR));
return this;
}

public HoodieArchivalConfig build() {
archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
return archivalConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,10 @@ public int getCommitArchivalBatchSize() {
return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
}

public boolean shouldBlockArchivalOnCleanECTR() {
return getBoolean(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR);
}

public Boolean shouldCleanBootstrapBaseFile() {
return getBoolean(HoodieCleanConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE);
}
Expand Down
Loading