diff --git a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java index 1c9cb0d5..146285e3 100644 --- a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java +++ b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java @@ -14,6 +14,7 @@ import ai.onehouse.config.models.configv1.ParserConfig; import ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher; import ai.onehouse.metadata_extractor.HoodiePropertiesReader; +import ai.onehouse.metadata_extractor.LSMTimelineManifestReader; import ai.onehouse.metadata_extractor.TableDiscoveryAndUploadJob; import ai.onehouse.metadata_extractor.TableDiscoveryService; import ai.onehouse.metadata_extractor.TableMetadataUploaderService; @@ -259,9 +260,11 @@ private TableDiscoveryAndUploadJob getTableDiscoveryAndUploadJob(@Nonnull Config lakeViewExtractorMetrics); PresignedUrlFileUploader presignedUrlFileUploader = new PresignedUrlFileUploader(asyncStorageClient, asyncHttpClientWithRetry, lakeViewExtractorMetrics); + LSMTimelineManifestReader lsmTimelineManifestReader = new LSMTimelineManifestReader(asyncStorageClient, + storageUtils); TimelineCommitInstantsUploader timelineCommitInstantsUploader = new TimelineCommitInstantsUploader(asyncStorageClient, presignedUrlFileUploader, onehouseApiClient, storageUtils, executorService, new ActiveTimelineInstantBatcher(config), - lakeViewExtractorMetrics, config); + lakeViewExtractorMetrics, config, lsmTimelineManifestReader); TableMetadataUploaderService tableMetadataUploaderService = new TableMetadataUploaderService(hoodiePropertiesReader, onehouseApiClient, timelineCommitInstantsUploader, lakeViewExtractorMetrics, executorService); diff --git a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java index f606bb81..4877f87e 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java @@ -17,6 +17,16 @@ private MetadataExtractorConstants() {} public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name"; public static final String HOODIE_TABLE_TYPE_KEY = "hoodie.table.type"; + public static final String HOODIE_TABLE_VERSION_KEY = "hoodie.table.version"; + public static final String HOODIE_TIMELINE_LAYOUT_VERSION_KEY = + "hoodie.timeline.layout.version"; + public static final String TIMELINE_FOLDER_NAME = "timeline"; + public static final String HISTORY_FOLDER_NAME = "history"; + public static final int TIMELINE_LAYOUT_VERSION_V1 = 1; + public static final int TIMELINE_LAYOUT_VERSION_V2 = 2; + public static final int HOODIE_TABLE_VERSION_DEFAULT = 6; + public static final int TIMELINE_LAYOUT_VERSION_DEFAULT = TIMELINE_LAYOUT_VERSION_V1; + public static final String MANIFEST_FILE_PREFIX = "manifest_"; // The default number of instants in one archived commit metadata file is 10 // so we want to ingest 10x active instants than archived instants in one batch @@ -39,10 +49,15 @@ private MetadataExtractorConstants() {} // Default batch size will be 5 MB public static final int DEFAULT_FILE_UPLOAD_STREAM_BATCH_SIZE = Integer.parseInt(System.getenv().getOrDefault("FILE_UPLOAD_STREAM_BATCH_SIZE", "5242880")); + public static final String VERSION_MARKER_FILE = "_version_"; public static final Pattern ARCHIVED_COMMIT_INSTANT_PATTERN = Pattern.compile("\\.commits_\\.archive\\.\\d+_\\d+-\\d+-\\d+"); + public static final Pattern ARCHIVED_COMMIT_INSTANT_PATTERN_V2 = + Pattern.compile("\\d+_\\d+_\\d+\\.parquet|manifest_\\d+|" + VERSION_MARKER_FILE); public static final Pattern ACTIVE_COMMIT_INSTANT_PATTERN = - Pattern.compile("\\d+(\\.[a-z]{1,20}){1,2}"); + Pattern.compile("\\d+(_\\d+)?(\\.[a-z]{1,20}){1,2}"); + public static final Pattern V1_ARCHIVED_NUMERIC_PATTERN = + Pattern.compile("\\.archive\\.(\\d+)_"); public static final Checkpoint INITIAL_CHECKPOINT = Checkpoint.builder() .batchId(0) @@ -50,6 +65,7 @@ private MetadataExtractorConstants() {} .lastUploadedFile("") .firstIncompleteCommitFile("") .archivedCommitsProcessed(false) + .lastArchivedManifestVersion(0) .build(); // hardcoding last modified at to prevent this from causing issues with our checkpoint logic @@ -73,5 +89,7 @@ private MetadataExtractorConstants() {} "restore", "clean", "compaction", - "replacecommit"); + "replacecommit", + "clustering", + "logcompaction"); } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java index 5c716a94..41b8e4ec 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java @@ -199,7 +199,10 @@ private List sortAndFilterInstants(List instants) { private List sortAndFilterInstants(List instants, Instant lastModifiedFilter) { return instants.stream() .filter(this::filterFile) - .collect(Collectors.groupingBy(file -> file.getFilename().split("\\.", 3)[0])) + .collect(Collectors.groupingBy(file -> { + String rawKey = file.getFilename().split("\\.", 3)[0]; + return rawKey.contains("_") ? rawKey.split("_")[0] : rawKey; + })) .values() .stream() .filter( @@ -264,6 +267,17 @@ static boolean areRelatedSavepointOrRollbackInstants( static ActiveTimelineInstant getActiveTimeLineInstant(String instant) { String[] parts = instant.split("\\.", 3); + // V9 completed instants embed completion time after an underscore in the leading token, + // e.g. "20260204053206256_20260204053210895". Split it out so callers see the request + // timestamp and the optional completion timestamp separately. + String timestamp = parts[0]; + String completionTime = null; + if (timestamp.contains("_")) { + String[] tsParts = timestamp.split("_", 2); + timestamp = tsParts[0]; + completionTime = tsParts[1]; + } + String action; String state; // For commit action, metadata file in inflight state is in the format of XYZ.inflight @@ -274,13 +288,21 @@ static ActiveTimelineInstant getActiveTimeLineInstant(String instant) { action = parts[1]; state = parts.length == 3 ? parts[2] : "completed"; } - return ActiveTimelineInstant.builder().timestamp(parts[0]).action(action).state(state).build(); + return ActiveTimelineInstant.builder() + .timestamp(timestamp) + .completionTime(completionTime) + .action(action) + .state(state) + .build(); } @Builder @Getter static class ActiveTimelineInstant { private final String timestamp; + // Only populated for V9 (table version >= 8) completed instants, which embed completion time + // alongside the request timestamp in the filename. Null for V1-V8 instants. + private final String completionTime; private final String action; private final String state; } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java index 7690d13c..c367de11 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java @@ -2,6 +2,10 @@ import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_NAME_KEY; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_TYPE_KEY; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_KEY; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TIMELINE_LAYOUT_VERSION_KEY; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT; import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; import com.google.inject.Inject; @@ -42,9 +46,18 @@ public CompletableFuture readHoodieProperties(String path) } catch (IOException e) { throw new RuntimeException("Failed to load properties file", e); } + int tableVersion = Integer.parseInt( + properties.getProperty( + HOODIE_TABLE_VERSION_KEY, String.valueOf(HOODIE_TABLE_VERSION_DEFAULT))); + int timelineLayoutVersion = Integer.parseInt( + properties.getProperty( + HOODIE_TIMELINE_LAYOUT_VERSION_KEY, + String.valueOf(TIMELINE_LAYOUT_VERSION_DEFAULT))); return ParsedHudiProperties.builder() .tableName(properties.getProperty(HOODIE_TABLE_NAME_KEY)) .tableType(TableType.valueOf(properties.getProperty(HOODIE_TABLE_TYPE_KEY))) + .tableVersion(tableVersion) + .timelineLayoutVersion(timelineLayoutVersion) .build(); }) .exceptionally( diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java new file mode 100644 index 00000000..8eceb7e0 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java @@ -0,0 +1,155 @@ +package ai.onehouse.metadata_extractor; + +import static ai.onehouse.constants.MetadataExtractorConstants.MANIFEST_FILE_PREFIX; +import static ai.onehouse.constants.MetadataExtractorConstants.VERSION_MARKER_FILE; + +import ai.onehouse.RuntimeModule.TableMetadataUploadObjectStorageAsyncClient; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.StorageUtils; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; + +/** + * Reads the Hudi LSM (V2) archived timeline manifest layout that lives under {@code + * .hoodie/timeline/history/}. The layout, written by Hudi 1.x, is: + * + *
+ *   _version_                                # text file containing the latest manifest version
+ *   manifest_N                               # JSON: { "files": [{ "fileName": "...", "fileLen": N }] }
+ *   {minInstant}_{maxInstant}_{level}.parquet
+ * 
+ * + *

The manifest is the source of truth for which parquet files are valid in the current + * snapshot. Compaction rewrites multiple low-level files into a single higher-level file, leaving + * orphaned data files on storage; only files referenced by the latest manifest should be treated + * as live. LakeView mirrors files to the backend rather than parsing them, so it only needs the + * filenames the manifest lists - no Hudi dependency required. + */ +@Slf4j +public class LSMTimelineManifestReader { + private final AsyncStorageClient asyncStorageClient; + private final StorageUtils storageUtils; + private final ObjectMapper mapper = new ObjectMapper(); + + @Inject + public LSMTimelineManifestReader( + @Nonnull @TableMetadataUploadObjectStorageAsyncClient AsyncStorageClient asyncStorageClient, + @Nonnull StorageUtils storageUtils) { + this.asyncStorageClient = asyncStorageClient; + this.storageUtils = storageUtils; + } + + /** + * Reads {@code _version_} and the corresponding {@code manifest_N} from the given history + * directory. Returns {@link ManifestSnapshot#empty()} when the {@code _version_} file does not + * exist (no archives have been written yet). + */ + public CompletableFuture readLatestManifest(String historyDirectoryUri) { + String versionFileUri = storageUtils.constructFileUri(historyDirectoryUri, VERSION_MARKER_FILE); + return asyncStorageClient + .readFileAsBytes(versionFileUri) + .exceptionally( + throwable -> { + // Only treat as empty when _version_ itself is missing (no archives written yet). + log.info( + "No V2 archived timeline _version_ file at {} (treating as empty). Reason: {}", + versionFileUri, + throwable.getMessage()); + return null; + }) + .thenCompose( + versionBytes -> { + if (versionBytes == null) { + return CompletableFuture.completedFuture(ManifestSnapshot.empty()); + } + int version = parseVersionFile(versionBytes); + // Manifest read failures propagate to the caller so the upload is retried + // on the next sync cycle rather than silently treating a readable _version_ + // as "no archives." + return readManifestForVersion(historyDirectoryUri, version) + .thenApply(files -> ManifestSnapshot.of(version, files)); + }); + } + + /** + * Reads {@code manifest_N} for the given version and returns the parquet filenames it + * references. Returns an empty list if the manifest cannot be read - callers treat this as + * "previous snapshot is gone, fall back to bootstrap" rather than a hard failure. + */ + public CompletableFuture> readManifestFileNames( + String historyDirectoryUri, int version) { + return readManifestForVersion(historyDirectoryUri, version) + .exceptionally( + throwable -> { + log.warn( + "Failed to read manifest_{} from {}; treating as missing. Reason: {}", + version, + historyDirectoryUri, + throwable.getMessage()); + return Collections.emptyList(); + }); + } + + private CompletableFuture> readManifestForVersion( + String historyDirectoryUri, int version) { + String manifestUri = + storageUtils.constructFileUri(historyDirectoryUri, MANIFEST_FILE_PREFIX + version); + return asyncStorageClient + .readFileAsBytes(manifestUri) + .thenApply( + bytes -> { + try { + return parseManifestFileNames(bytes); + } catch (IOException e) { + throw new UncheckedIOException( + "Failed to parse LSM manifest at " + manifestUri, e); + } + }); + } + + private static int parseVersionFile(byte[] bytes) { + String contents = new String(bytes, StandardCharsets.UTF_8).trim(); + return Integer.parseInt(contents); + } + + private List parseManifestFileNames(byte[] bytes) throws IOException { + JsonNode root = mapper.readTree(bytes); + JsonNode files = root.get("files"); + List result = new ArrayList<>(); + if (files != null && files.isArray()) { + for (JsonNode entry : files) { + JsonNode name = entry.get("fileName"); + if (name != null && !name.asText().isEmpty()) { + result.add(name.asText()); + } + } + } + return result; + } + + /** Snapshot of the latest LSM manifest: version number plus the parquet filenames it lists. */ + @Value(staticConstructor = "of") + public static class ManifestSnapshot { + int version; + @Nonnull List parquetFileNames; + + public static ManifestSnapshot empty() { + return ManifestSnapshot.of(0, Collections.emptyList()); + } + + public boolean isEmpty() { + return version == 0; + } + } +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java index 6e9c2353..e66d4629 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java @@ -1,6 +1,7 @@ package ai.onehouse.metadata_extractor; import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_COMMIT_INSTANT_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_COMMIT_INSTANT_PATTERN_V2; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE; import static ai.onehouse.constants.MetadataExtractorConstants.INITIAL_CHECKPOINT; @@ -22,6 +23,7 @@ import ai.onehouse.metadata_extractor.models.Checkpoint; import ai.onehouse.metadata_extractor.models.Table; import ai.onehouse.metrics.LakeViewExtractorMetrics; +import ai.onehouse.metadata_extractor.models.ParsedHudiProperties; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +33,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.stream.Collectors; @@ -49,6 +52,12 @@ public class TableMetadataUploaderService { private final LakeViewExtractorMetrics hudiMetadataExtractorMetrics; private final ExecutorService executorService; private final ObjectMapper mapper; + // Cache stores a few lightweight entries (one per table). No TTL needed since table versions + // do not change at runtime, and LakeView restarts on upgrades. Stores the in-flight future + // so concurrent table-batch processing dedupes reads (computeIfAbsent semantics) instead of + // each thread issuing its own hoodie.properties fetch. + private final Map> propertiesCache = + new ConcurrentHashMap<>(); @Inject public TableMetadataUploaderService( @@ -139,13 +148,36 @@ private CompletableFuture uploadInstantsInTableBatch(List tables // checkpoints found, continue from previous checkpoint String checkpointString = tableCheckpointMap.get(table.getTableId()).getCheckpoint(); + Checkpoint checkpoint = + StringUtils.isNotBlank(checkpointString) + ? mapper.readValue(checkpointString, Checkpoint.class) + : INITIAL_CHECKPOINT; processTablesFuture.add( - uploadNewInstantsSinceCheckpoint( - table.getTableId(), - table, - StringUtils.isNotBlank(checkpointString) - ? mapper.readValue(checkpointString, Checkpoint.class) - : INITIAL_CHECKPOINT)); + getOrLoadProperties(table) + .thenComposeAsync( + properties -> { + if (properties.getMetadataUploadFailureReasons() != null) { + log.error( + "Failed to read hoodie.properties for table: {} " + + "reason: {}. Skipping table to avoid " + + "mis-detecting the timeline layout version.", + table, + properties.getMetadataUploadFailureReasons()); + return CompletableFuture.completedFuture(false); + } + Table tableWithVersion = + table + .toBuilder() + .tableVersion(properties.getTableVersion()) + .timelineLayoutVersion( + properties.getTimelineLayoutVersion()) + .build(); + return uploadNewInstantsSinceCheckpoint( + tableWithVersion.getTableId(), + tableWithVersion, + checkpoint); + }, + executorService)); } catch (JsonProcessingException e) { log.error( "Error deserializing checkpoint value for table: {}, skipping table", @@ -202,6 +234,7 @@ private int getNumberOfMissingTables(List tableIdToProperties = new ConcurrentHashMap<>(); List< CompletableFuture< InitializeTableMetricsCheckpointRequest @@ -209,8 +242,7 @@ private int getNumberOfMissingTables(List(); for (Table table : tablesToInitialise) { initializeSingleTableMetricsCheckpointRequestFutureList.add( - hoodiePropertiesReader - .readHoodieProperties(getHoodiePropertiesFilePath(table)) + getOrLoadProperties(table) .thenApply( properties -> { if (properties == null || properties.getMetadataUploadFailureReasons() != null) { @@ -232,6 +264,7 @@ private int getNumberOfMissingTables(List uploadNewInstantsSinceCheckpoint( * this allows us to continue from the previous batch id */ Checkpoint activeTimelineCheckpoint = - ARCHIVED_COMMIT_INSTANT_PATTERN.matcher(checkpoint.getLastUploadedFile()).matches() + (ARCHIVED_COMMIT_INSTANT_PATTERN.matcher(checkpoint.getLastUploadedFile()).matches() + || ARCHIVED_COMMIT_INSTANT_PATTERN_V2.matcher(checkpoint.getLastUploadedFile()) + .matches()) ? resetCheckpoint(checkpoint) : checkpoint; return timelineCommitInstantsUploader @@ -398,6 +441,34 @@ private CompletableFuture uploadNewInstantsSinceCheckpoint( .thenApply(Objects::nonNull); } + /** + * Returns a cached or in-flight {@link ParsedHudiProperties} future for the table. Concurrent + * callers see the same future instead of issuing duplicate hoodie.properties reads. If the + * underlying read fails (parsed properties carry a non-null failure reason), the cache entry is + * evicted so the next sync attempt retries from scratch. + * + *

Eviction is dispatched via {@code whenCompleteAsync} on {@link #executorService} so the + * removal never runs on the same call stack as {@link java.util.concurrent.ConcurrentHashMap + * #computeIfAbsent} - mutating the map from inside its own mapping function would trip the + * "Recursive update" guard. + */ + private CompletableFuture getOrLoadProperties(Table table) { + return propertiesCache.computeIfAbsent( + table.getTableId(), + id -> + hoodiePropertiesReader + .readHoodieProperties(getHoodiePropertiesFilePath(table)) + .whenCompleteAsync( + (result, throwable) -> { + if (throwable != null + || result == null + || result.getMetadataUploadFailureReasons() != null) { + propertiesCache.remove(id); + } + }, + executorService)); + } + private String getHoodiePropertiesFilePath(Table table) { String basePath = table.getAbsoluteTableUri(); return String.format( diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java index 52465978..ca25128b 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java @@ -2,12 +2,19 @@ import static ai.onehouse.constants.MetadataExtractorConstants.ACTIVE_COMMIT_INSTANT_PATTERN; import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_COMMIT_INSTANT_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_COMMIT_INSTANT_PATTERN_V2; import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_FOLDER_NAME; +import static ai.onehouse.constants.MetadataExtractorConstants.HISTORY_FOLDER_NAME; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE_OBJ; +import static ai.onehouse.constants.MetadataExtractorConstants.MANIFEST_FILE_PREFIX; import static ai.onehouse.constants.MetadataExtractorConstants.ROLLBACK_ACTION; import static ai.onehouse.constants.MetadataExtractorConstants.SAVEPOINT_ACTION; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_FOLDER_NAME; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_V2; +import static ai.onehouse.constants.MetadataExtractorConstants.V1_ARCHIVED_NUMERIC_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.VERSION_MARKER_FILE; import static ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher.areRelatedInstants; import static ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher.areRelatedSavepointOrRollbackInstants; import static ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher.getActiveTimeLineInstant; @@ -37,13 +44,15 @@ import ai.onehouse.RuntimeModule.TableMetadataUploadObjectStorageAsyncClient; import java.math.BigDecimal; +import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -65,6 +74,7 @@ public class TimelineCommitInstantsUploader { private final ActiveTimelineInstantBatcher activeTimelineInstantBatcher; private final LakeViewExtractorMetrics hudiMetadataExtractorMetrics; private final MetadataExtractorConfig extractorConfig; + private final LSMTimelineManifestReader lsmTimelineManifestReader; @Inject public TimelineCommitInstantsUploader( @@ -75,7 +85,8 @@ public TimelineCommitInstantsUploader( @Nonnull ExecutorService executorService, @Nonnull ActiveTimelineInstantBatcher activeTimelineInstantBatcher, @Nonnull LakeViewExtractorMetrics hudiMetadataExtractorMetrics, - @Nonnull Config config) { + @Nonnull Config config, + @Nonnull LSMTimelineManifestReader lsmTimelineManifestReader) { this.asyncStorageClient = asyncStorageClient; this.presignedUrlFileUploader = presignedUrlFileUploader; this.onehouseApiClient = onehouseApiClient; @@ -84,6 +95,7 @@ public TimelineCommitInstantsUploader( this.activeTimelineInstantBatcher = activeTimelineInstantBatcher; this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; this.extractorConfig = config.getMetadataExtractorConfig(); + this.lsmTimelineManifestReader = lsmTimelineManifestReader; this.mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); } @@ -108,7 +120,17 @@ public CompletableFuture batchUploadWithCheckpoint( String timelineUri = storageUtils.constructFileUri( - table.getAbsoluteTableUri(), getPathSuffixForTimeline(commitTimelineType)); + table.getAbsoluteTableUri(), + getPathSuffixForTimeline(commitTimelineType, table.getTimelineLayoutVersion())); + + if (CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) + && table.getTimelineLayoutVersion() == TIMELINE_LAYOUT_VERSION_V2) { + // V2 (Hudi 1.x) archived timeline lives under .hoodie/timeline/history/ as an LSM tree. + // We mirror it manifest-by-manifest rather than listing files: the manifest is the only + // source of truth for which parquet files are live, and filename-based checkpointing is + // unsafe because compaction rewrites the file set between runs. + return executeManifestDrivenArchivedUpload(tableId, table, timelineUri, checkpoint); + } return executeFullBatchUpload(tableId, table, timelineUri, checkpoint, commitTimelineType); } @@ -134,7 +156,9 @@ public CompletableFuture paginatedBatchUploadWithCheckpoint( String prefix = storageUtils.getPathFromUrl( storageUtils.constructFileUri( - table.getAbsoluteTableUri(), getPathSuffixForTimeline(commitTimelineType))); + table.getAbsoluteTableUri(), + getPathSuffixForTimeline( + commitTimelineType, table.getTimelineLayoutVersion()))); // startAfter is used only in the first call to get the objects, post that continuation token is // used @@ -188,6 +212,261 @@ private CompletableFuture executeFullBatchUpload( }); } + /** + * V2 archived upload, manifest-driven. + * + *

Reads {@code _version_} and the latest {@code manifest_N} from {@code + * .hoodie/timeline/history/}. Uploads only parquet files newly referenced by the current + * manifest (diffed against the previous manifest when available, otherwise bootstrapped from + * scratch), then uploads the new manifest, then the {@code _version_} marker last so the + * backend never sees a manifest pointing at files that have not arrived yet. Checkpoints by + * manifest version, not filename, which makes the path compaction-safe: compaction merges L0 + * files into a single higher-level file, but the new manifest references the merged file and + * the backend (which reads via {@code TimelineFactory.createArchivedTimeline}, see + * gateway-controller PR 8797) honours the manifest, so any orphaned mirror copies are inert. + */ + private CompletableFuture executeManifestDrivenArchivedUpload( + String tableId, Table table, String historyUri, Checkpoint checkpoint) { + return lsmTimelineManifestReader + .readLatestManifest(historyUri) + .thenComposeAsync( + currentSnapshot -> + processManifestSnapshot( + tableId, table, historyUri, checkpoint, currentSnapshot), + executorService) + .exceptionally( + throwable -> { + log.error( + "Encountered exception when uploading V2 archived timeline for table {}", + table, + throwable); + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + getMetadataExtractorFailureReason( + throwable, + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN), + String.format( + "Exception when uploading V2 archived timeline for table %s: %s", + table, throwable.getMessage())); + return null; + }); + } + + private CompletableFuture processManifestSnapshot( + String tableId, + Table table, + String historyUri, + Checkpoint checkpoint, + LSMTimelineManifestReader.ManifestSnapshot currentSnapshot) { + if (currentSnapshot.isEmpty()) { + log.info( + "No V2 archived timeline yet for table {} (no _version_ file). Skipping.", table); + return CompletableFuture.completedFuture( + checkpoint.toBuilder().archivedCommitsProcessed(true).build()); + } + + int previousVersion = checkpoint.getLastArchivedManifestVersion(); + int currentVersion = currentSnapshot.getVersion(); + if (currentVersion == previousVersion && checkpoint.isArchivedCommitsProcessed()) { + log.info( + "V2 archived timeline already at manifest_{} for table {}; nothing to do.", + currentVersion, + table); + return CompletableFuture.completedFuture(checkpoint); + } + + CompletableFuture> previouslyMirroredFuture = + previousVersion > 0 + ? lsmTimelineManifestReader + .readManifestFileNames(historyUri, previousVersion) + .thenApply(HashSet::new) + : CompletableFuture.completedFuture(new HashSet<>()); + + return previouslyMirroredFuture.thenComposeAsync( + previouslyMirrored -> { + List filesToUpload = + buildV2ArchivedUploadList(checkpoint, currentSnapshot, previouslyMirrored, + currentVersion); + int newParquetCount = filesToUpload.size() - 2 + - (checkpoint.getBatchId() == 0 ? 1 : 0); + + log.info( + "V2 archived: mirroring {} new parquet(s) plus manifest_{} for table {}" + + " (previous manifest version: {}, previous file count: {})", + newParquetCount, + currentVersion, + table, + previousVersion, + previouslyMirrored.size()); + + return uploadV2ArchivedFilesInBatches( + tableId, table, filesToUpload, checkpoint, currentVersion); + }, + executorService); + } + + private List buildV2ArchivedUploadList( + Checkpoint checkpoint, + LSMTimelineManifestReader.ManifestSnapshot currentSnapshot, + Set previouslyMirrored, + int currentVersion) { + List filesToUpload = new ArrayList<>(); + if (checkpoint.getBatchId() == 0) { + filesToUpload.add(HOODIE_PROPERTIES_FILE_OBJ); + } + for (String parquet : currentSnapshot.getParquetFileNames()) { + if (!previouslyMirrored.contains(parquet)) { + filesToUpload.add(buildArchivedFile(parquet)); + } + } + // Upload order: parquets -> manifest -> _version_. The manifest must arrive after every + // parquet it references, and _version_ must be the very last write so a partial run never + // advertises an inconsistent snapshot to the backend. + filesToUpload.add(buildArchivedFile(MANIFEST_FILE_PREFIX + currentVersion)); + filesToUpload.add(buildArchivedFile(VERSION_MARKER_FILE)); + return filesToUpload; + } + + private static File buildArchivedFile(String filename) { + return File.builder() + .filename(filename) + .isDirectory(false) + .lastModifiedAt(Instant.EPOCH) + .build(); + } + + /** + * Uploads the V2 archived file list in sequential batches and writes a single checkpoint after + * the final batch succeeds. The checkpoint advances {@code lastArchivedManifestVersion} so the + * next run knows which manifest to diff against. + */ + private CompletableFuture uploadV2ArchivedFilesInBatches( + String tableId, + Table table, + List filesToUpload, + Checkpoint previousCheckpoint, + int newManifestVersion) { + int batchSize = getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + List> batches = Lists.partition(filesToUpload, batchSize); + String directoryUri = + storageUtils.constructFileUri( + table.getAbsoluteTableUri(), + getPathSuffixForTimeline( + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED, table.getTimelineLayoutVersion())); + + CompletableFuture> sequentialUpload = + CompletableFuture.completedFuture(new ArrayList<>()); + for (List batch : batches) { + sequentialUpload = + sequentialUpload.thenComposeAsync( + accumulated -> { + if (accumulated == null) { + return CompletableFuture.completedFuture(null); + } + return uploadBatch( + tableId, + batch, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED, + directoryUri, + table.getTimelineLayoutVersion()) + .thenApply( + ignored -> { + for (File f : batch) { + accumulated.add( + UploadedFile.builder() + .name( + getFileNameWithPrefix( + f, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED, + table.getTimelineLayoutVersion())) + .lastModifiedAt(f.getLastModifiedAt().toEpochMilli()) + .build()); + } + return accumulated; + }) + .exceptionally( + throwable -> { + hudiMetadataExtractorMetrics + .incrementTableMetadataProcessingFailureCounter( + getMetadataExtractorFailureReason( + throwable, + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN), + String.format( + "V2 archived batch upload failed for table %s: %s", + table.getAbsoluteTableUri(), throwable.getMessage())); + log.error( + "V2 archived batch upload failed for table: {}. Skipping further" + + " batches in this run.", + table.getAbsoluteTableUri(), + throwable); + return null; + }); + }, + executorService); + } + + return sequentialUpload.thenComposeAsync( + accumulated -> { + if (accumulated == null || accumulated.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return upsertV2ArchivedCheckpoint( + tableId, filesToUpload, accumulated, previousCheckpoint, batches.size(), + newManifestVersion); + }, + executorService); + } + + private CompletableFuture upsertV2ArchivedCheckpoint( + String tableId, + List filesToUpload, + List accumulated, + Checkpoint previousCheckpoint, + int batchCount, + int newManifestVersion) { + // The final file uploaded is _version_, which is also the marker we use as + // lastUploadedFile for back-compat with the v1 checkpoint shape. + File lastFile = filesToUpload.get(filesToUpload.size() - 1); + Checkpoint updatedCheckpoint = + previousCheckpoint + .toBuilder() + .batchId(previousCheckpoint.getBatchId() + batchCount) + .lastUploadedFile(lastFile.getFilename()) + .checkpointTimestamp(lastFile.getLastModifiedAt()) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(newManifestVersion) + .build(); + try { + return onehouseApiClient + .upsertTableMetricsCheckpoint( + UpsertTableMetricsCheckpointRequest.builder() + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .tableId(tableId) + .checkpoint(mapper.writeValueAsString(updatedCheckpoint)) + .filesUploaded( + accumulated.stream() + .map(UploadedFile::getName) + .collect(Collectors.toList())) + .uploadedFiles(accumulated) + .build()) + .thenApply( + response -> { + if (response.isFailure()) { + throw new IllegalStateException( + String.format( + "failed to update checkpoint: status_code: %d, exception: %s", + response.getStatusCode(), response.getCause())); + } + hudiMetadataExtractorMetrics.incrementTablesProcessedCounter(); + return updatedCheckpoint; + }); + } catch (JsonProcessingException e) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally( + new IllegalStateException("failed to serialise checkpoint", e)); + return failed; + } + } + private CompletableFuture executePaginatedBatchUpload( String tableId, Table table, @@ -365,7 +644,9 @@ private CompletableFuture uploadInstantsInSequentialBatches( commitTimelineType, storageUtils.constructFileUri( table.getAbsoluteTableUri(), - getPathSuffixForTimeline(commitTimelineType))) + getPathSuffixForTimeline( + commitTimelineType, table.getTimelineLayoutVersion())), + table.getTimelineLayoutVersion()) .thenComposeAsync( ignored2 -> updateCheckpointAfterProcessingBatch( @@ -377,7 +658,9 @@ private CompletableFuture uploadInstantsInSequentialBatches( file -> UploadedFile.builder() .name( - getFileNameWithPrefix(file, commitTimelineType)) + getFileNameWithPrefix( + file, commitTimelineType, + table.getTimelineLayoutVersion())) .lastModifiedAt( file.getLastModifiedAt().toEpochMilli()) .build()) @@ -408,10 +691,11 @@ private CompletableFuture uploadBatch( String tableId, List batch, CommitTimelineType commitTimelineType, - String directoryUri) { + String directoryUri, + int timelineLayoutVersion) { List commitInstants = batch.stream() - .map(file -> getFileNameWithPrefix(file, commitTimelineType)) + .map(file -> getFileNameWithPrefix(file, commitTimelineType, timelineLayoutVersion)) .collect(Collectors.toList()); return onehouseApiClient .generateCommitMetadataUploadUrl( @@ -435,7 +719,8 @@ private CompletableFuture uploadBatch( uploadFutures.add( presignedUrlFileUploader.uploadFileToPresignedUrl( generateCommitMetadataUploadUrlResponse.getUploadUrls().get(i), - constructStorageUri(directoryUri, batch.get(i).getFilename()), + constructStorageUri( + directoryUri, batch.get(i).getFilename(), timelineLayoutVersion), extractorConfig.getFileUploadStreamBatchSize()) .thenApply(result -> { hudiMetadataExtractorMetrics.incrementMetadataUploadSuccessCounter(); @@ -590,49 +875,84 @@ private boolean isInstantAlreadyUploaded( private boolean isInstantFile(String fileName) { return ACTIVE_COMMIT_INSTANT_PATTERN.matcher(fileName).matches() - || ARCHIVED_COMMIT_INSTANT_PATTERN.matcher(fileName).matches(); + || ARCHIVED_COMMIT_INSTANT_PATTERN.matcher(fileName).matches() + || ARCHIVED_COMMIT_INSTANT_PATTERN_V2.matcher(fileName).matches(); } - private String constructStorageUri(String directoryUri, String fileName) { + private String constructStorageUri( + String directoryUri, String fileName, int timelineLayoutVersion) { if (HOODIE_PROPERTIES_FILE.equals(fileName)) { - String archivedSuffix = ARCHIVED_FOLDER_NAME + '/'; - String hoodieDirectoryUri = - directoryUri.endsWith(archivedSuffix) - ? directoryUri.substring(0, directoryUri.length() - "archived/".length()) - : directoryUri; + String hoodieDirectoryUri = directoryUri; + if (timelineLayoutVersion == TIMELINE_LAYOUT_VERSION_V2) { + // V2: strip "timeline/history/" or "timeline/" to get back to .hoodie/ + String historySuffix = TIMELINE_FOLDER_NAME + '/' + HISTORY_FOLDER_NAME + '/'; + String timelineSuffix = TIMELINE_FOLDER_NAME + '/'; + if (directoryUri.endsWith(historySuffix)) { + hoodieDirectoryUri = + directoryUri.substring(0, directoryUri.length() - historySuffix.length()); + } else if (directoryUri.endsWith(timelineSuffix)) { + hoodieDirectoryUri = + directoryUri.substring(0, directoryUri.length() - timelineSuffix.length()); + } + } else { + // V1: strip "archived/" to get back to .hoodie/ + String archivedSuffix = ARCHIVED_FOLDER_NAME + '/'; + if (directoryUri.endsWith(archivedSuffix)) { + hoodieDirectoryUri = + directoryUri.substring(0, directoryUri.length() - archivedSuffix.length()); + } + } return storageUtils.constructFileUri(hoodieDirectoryUri, HOODIE_PROPERTIES_FILE); } return storageUtils.constructFileUri(directoryUri, fileName); } - private String getPathSuffixForTimeline(CommitTimelineType commitTimelineType) { + private String getPathSuffixForTimeline( + CommitTimelineType commitTimelineType, int timelineLayoutVersion) { String pathSuffix = HOODIE_FOLDER_NAME + '/'; + if (timelineLayoutVersion == TIMELINE_LAYOUT_VERSION_V2) { + pathSuffix += TIMELINE_FOLDER_NAME + '/'; + return CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) + ? pathSuffix + HISTORY_FOLDER_NAME + '/' + : pathSuffix; + } return CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) ? pathSuffix + ARCHIVED_FOLDER_NAME + '/' : pathSuffix; } - private String getFileNameWithPrefix(File file, CommitTimelineType commitTimelineType) { - String archivedPrefix = "archived/"; - return CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) - && !HOODIE_PROPERTIES_FILE.equals(file.getFilename()) - ? archivedPrefix + file.getFilename() - : file.getFilename(); + private String getFileNameWithPrefix( + File file, CommitTimelineType commitTimelineType, int timelineLayoutVersion) { + if (CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) + && !HOODIE_PROPERTIES_FILE.equals(file.getFilename())) { + if (timelineLayoutVersion == TIMELINE_LAYOUT_VERSION_V2) { + return TIMELINE_FOLDER_NAME + '/' + HISTORY_FOLDER_NAME + '/' + file.getFilename(); + } + return ARCHIVED_FOLDER_NAME + '/' + file.getFilename(); + } + if (timelineLayoutVersion == TIMELINE_LAYOUT_VERSION_V2 + && !HOODIE_PROPERTIES_FILE.equals(file.getFilename())) { + return TIMELINE_FOLDER_NAME + '/' + file.getFilename(); + } + return file.getFilename(); } private BigDecimal getCommitIdFromActiveTimelineInstant(String activeTimeLineInstant) { - return new BigDecimal(activeTimeLineInstant.split("\\.")[0]); + String timestampPart = activeTimeLineInstant.split("\\.")[0]; + if (timestampPart.contains("_")) { + timestampPart = timestampPart.split("_")[0]; + } + return new BigDecimal(timestampPart); } - private int getNumericPartFromArchivedCommit(String archivedCommitFileName) { - Pattern pattern = Pattern.compile("\\.archive\\.(\\d+)_"); - Matcher matcher = pattern.matcher(archivedCommitFileName); - - if (matcher.find()) { - return Integer.parseInt(matcher.group(1)); - } else { - throw new IllegalArgumentException("invalid archived commit file type"); + private long getNumericPartFromArchivedCommit(String archivedCommitFileName) { + // V1 archive file: .commits_.archive.5_20260101-20260115-50 + Matcher v1Matcher = V1_ARCHIVED_NUMERIC_PATTERN.matcher(archivedCommitFileName); + if (v1Matcher.find()) { + return Long.parseLong(v1Matcher.group(1)); } + throw new IllegalArgumentException( + "invalid archived commit file type: " + archivedCommitFileName); } public String getStartAfterString(String prefix, Checkpoint checkpoint, boolean isFirstFetch) { @@ -706,19 +1026,16 @@ public File getLastUploadedFileFromBatch( } private boolean isSavepointCommit(File file) { - String[] parts = file.getFilename().split("\\.", 3); - if (parts.length < 2) { - return false; - } - return SAVEPOINT_ACTION.equals(parts[1]); + return hasActionType(file, SAVEPOINT_ACTION); } private boolean isRollbackCommit(File file) { + return hasActionType(file, ROLLBACK_ACTION); + } + + private boolean hasActionType(File file, String actionType) { String[] parts = file.getFilename().split("\\.", 3); - if (parts.length < 2) { - return false; - } - return ROLLBACK_ACTION.equals(parts[1]); + return parts.length >= 2 && actionType.equals(parts[1]); } @VisibleForTesting diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Checkpoint.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Checkpoint.java index e4d2dabc..92d1a749 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Checkpoint.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Checkpoint.java @@ -16,4 +16,9 @@ public class Checkpoint implements Serializable { @NonNull String lastUploadedFile; String firstIncompleteCommitFile; boolean archivedCommitsProcessed; + // Last LSM manifest version (from .hoodie/timeline/history/_version_) that has been + // mirrored to the backend. Used for V2 (Hudi 1.x / table version >= 8) archived timeline + // upload, where archives live in an LSM tree and filename-based checkpointing breaks + // because compaction rewrites the file set. V1 archived path leaves this at 0. + @Builder.Default int lastArchivedManifestVersion = 0; } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java index 21ddeacb..17e5549a 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java @@ -1,5 +1,8 @@ package ai.onehouse.metadata_extractor.models; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT; + import ai.onehouse.api.models.request.TableType; import ai.onehouse.constants.MetricsConstants; import lombok.Builder; @@ -14,4 +17,6 @@ public class ParsedHudiProperties { @NonNull String tableName; @NonNull TableType tableType; @Nullable MetricsConstants.MetadataUploadFailureReasons metadataUploadFailureReasons; + @Builder.Default int tableVersion = HOODIE_TABLE_VERSION_DEFAULT; + @Builder.Default int timelineLayoutVersion = TIMELINE_LAYOUT_VERSION_DEFAULT; } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java index c60037d6..ec918e69 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java @@ -1,5 +1,8 @@ package ai.onehouse.metadata_extractor.models; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT; + import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -15,4 +18,6 @@ public class Table { private final String databaseName; private final String lakeName; private String tableId; + @Builder.Default private final int tableVersion = HOODIE_TABLE_VERSION_DEFAULT; + @Builder.Default private final int timelineLayoutVersion = TIMELINE_LAYOUT_VERSION_DEFAULT; } diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcherTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcherTest.java index 54ddef9d..2413ab7b 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcherTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcherTest.java @@ -575,6 +575,37 @@ void testCreateBatchJustHoodieProperties(List instants, List> e assertEquals(expectedBatches, actualBatches); } + @Test + void testCreateBatchWithV9CompletedInstants() { + // V9 completed instants have completion timestamp: {ts}_{completionTs}.action + // Requested and inflight files remain unchanged: {ts}.action.requested + List files = + Arrays.asList( + generateFileObj("20260204053206256.deltacommit.requested"), + generateFileObj("20260204053206256.deltacommit.inflight"), + generateFileObj("20260204053206256_20260204053210895.deltacommit"), + generateFileObj("20260204053205307.compaction.requested"), + generateFileObj("20260204053205307.compaction.inflight"), + generateFileObj("20260204053205307_20260204053222939.commit"), + generateFileObj("hoodie.properties")); + + List> expectedBatches = + Arrays.asList( + Arrays.asList( + generateFileObj("hoodie.properties"), + generateFileObj("20260204053205307.compaction.inflight"), + generateFileObj("20260204053205307.compaction.requested"), + generateFileObj("20260204053205307_20260204053222939.commit")), + Arrays.asList( + generateFileObj("20260204053206256.deltacommit.inflight"), + generateFileObj("20260204053206256.deltacommit.requested"), + generateFileObj("20260204053206256_20260204053210895.deltacommit"))); + + List> actualBatches = + activeTimelineInstantBatcher.createBatches(files, 4, getCheckpoint()).getRight(); + assertEquals(expectedBatches, actualBatches); + } + @Test void testWithInvalidBatchSize() { assertThrows( diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/ContinueOnIncompleteCommitStrategyTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/ContinueOnIncompleteCommitStrategyTest.java index 87808ce6..9ad1906f 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/ContinueOnIncompleteCommitStrategyTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/ContinueOnIncompleteCommitStrategyTest.java @@ -58,6 +58,7 @@ class ContinueOnIncompleteCommitStrategyTest { @Mock private Config config; @Mock private MetadataExtractorConfig metadataExtractorConfig; @Mock private LakeViewExtractorMetrics hudiMetadataExtractorMetrics; + @Mock private LSMTimelineManifestReader lsmTimelineManifestReader; private TimelineCommitInstantsUploader timelineCommitInstantsUploader; private final ObjectMapper mapper = new ObjectMapper(); private static final String S3_TABLE_URI = "s3://bucket/table/"; @@ -88,7 +89,8 @@ private TimelineCommitInstantsUploader getTimelineCommitInstantsUploader() { ForkJoinPool.commonPool(), activeTimelineInstantBatcher, hudiMetadataExtractorMetrics, - config); + config, + lsmTimelineManifestReader); } @BeforeEach diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java index 8e1ac56e..70cad0c0 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java @@ -49,6 +49,31 @@ void testReadHoodieProperties(TableType tableType) ParsedHudiProperties result = futureResult.get(); assertEquals("test_table", result.getTableName()); assertEquals(tableType, result.getTableType()); + assertEquals(6, result.getTableVersion()); + assertEquals(1, result.getTimelineLayoutVersion()); + } + + @Test + void testReadHoodiePropertiesV9() throws ExecutionException, InterruptedException { + String path = "some/path/to/properties/file"; + String propertiesContent = + "hoodie.table.name=test_v9_table\n" + + "hoodie.table.type=MERGE_ON_READ\n" + + "hoodie.table.version=9\n" + + "hoodie.timeline.layout.version=2"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(propertiesContent.getBytes()); + + when(asyncStorageClient.streamFileAsync(path)) + .thenReturn(CompletableFuture.completedFuture(getFileStreamData(inputStream))); + + CompletableFuture futureResult = + hoodiePropertiesReader.readHoodieProperties(path); + + ParsedHudiProperties result = futureResult.get(); + assertEquals("test_v9_table", result.getTableName()); + assertEquals(TableType.MERGE_ON_READ, result.getTableType()); + assertEquals(9, result.getTableVersion()); + assertEquals(2, result.getTimelineLayoutVersion()); } @Test diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java new file mode 100644 index 00000000..f5501b90 --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java @@ -0,0 +1,161 @@ +package ai.onehouse.metadata_extractor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +import ai.onehouse.metadata_extractor.LSMTimelineManifestReader.ManifestSnapshot; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.StorageUtils; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class LSMTimelineManifestReaderTest { + @Mock private AsyncStorageClient asyncStorageClient; + private LSMTimelineManifestReader reader; + + private static final String HISTORY_URI = "s3://bucket/table/.hoodie/timeline/history/"; + + @BeforeEach + void setUp() { + reader = new LSMTimelineManifestReader(asyncStorageClient, new StorageUtils()); + } + + @Test + void testReadLatestManifest() { + mockReadFile(HISTORY_URI + "_version_", "3"); + mockReadFile( + HISTORY_URI + "manifest_3", + "{\n" + + " \"files\" : [ {\n" + + " \"fileName\" : \"20260101_20260102_0.parquet\",\n" + + " \"fileLen\" : 1024\n" + + " }, {\n" + + " \"fileName\" : \"20260102_20260103_1.parquet\",\n" + + " \"fileLen\" : 2048\n" + + " } ]\n" + + "}"); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertEquals(3, snapshot.getVersion()); + assertEquals( + Arrays.asList("20260101_20260102_0.parquet", "20260102_20260103_1.parquet"), + snapshot.getParquetFileNames()); + assertTrue(!snapshot.isEmpty()); + } + + @Test + void testReadLatestManifest_VersionFileMissing() { + when(asyncStorageClient.readFileAsBytes(HISTORY_URI + "_version_")) + .thenReturn(failedFuture(new RuntimeException("NoSuchKey"))); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertTrue(snapshot.isEmpty()); + assertEquals(0, snapshot.getVersion()); + assertEquals(Collections.emptyList(), snapshot.getParquetFileNames()); + } + + @Test + void testReadLatestManifest_ManifestReadFailurePropagates() { + // _version_ exists but the manifest it points to is unreadable. This should NOT + // return empty — it should propagate the exception so the caller retries next cycle. + mockReadFile(HISTORY_URI + "_version_", "2"); + when(asyncStorageClient.readFileAsBytes(HISTORY_URI + "manifest_2")) + .thenReturn(failedFuture(new RuntimeException("AccessDenied"))); + + CompletableFuture future = reader.readLatestManifest(HISTORY_URI); + + CompletionException ex = assertThrows(CompletionException.class, future::join); + assertTrue(ex.getCause().getMessage().contains("AccessDenied")); + } + + @Test + void testReadLatestManifest_EmptyFilesArray() { + mockReadFile(HISTORY_URI + "_version_", "1"); + mockReadFile(HISTORY_URI + "manifest_1", "{ \"files\" : [] }"); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertEquals(1, snapshot.getVersion()); + assertEquals(Collections.emptyList(), snapshot.getParquetFileNames()); + assertTrue(!snapshot.isEmpty()); + } + + @Test + void testReadManifestFileNames() { + mockReadFile( + HISTORY_URI + "manifest_5", + "{ \"files\" : [ { \"fileName\" : \"a.parquet\", \"fileLen\" : 100 } ] }"); + + List files = reader.readManifestFileNames(HISTORY_URI, 5).join(); + + assertEquals(Collections.singletonList("a.parquet"), files); + } + + @Test + void testReadManifestFileNames_ManifestMissing() { + when(asyncStorageClient.readFileAsBytes(HISTORY_URI + "manifest_99")) + .thenReturn(failedFuture(new RuntimeException("NoSuchKey"))); + + List files = reader.readManifestFileNames(HISTORY_URI, 99).join(); + + assertEquals(Collections.emptyList(), files); + } + + @Test + void testReadLatestManifest_IgnoresUnknownFields() { + mockReadFile(HISTORY_URI + "_version_", "1"); + mockReadFile( + HISTORY_URI + "manifest_1", + "{ \"files\" : [ { \"fileName\" : \"x.parquet\", \"fileLen\" : 42," + + " \"unknownField\" : true } ], \"extraTopLevel\" : 123 }"); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertEquals(1, snapshot.getVersion()); + assertEquals(Collections.singletonList("x.parquet"), snapshot.getParquetFileNames()); + } + + @Test + void testReadLatestManifest_VersionWithWhitespace() { + mockReadFile(HISTORY_URI + "_version_", " 7\n"); + mockReadFile(HISTORY_URI + "manifest_7", "{ \"files\" : [] }"); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertEquals(7, snapshot.getVersion()); + } + + @Test + void testManifestSnapshotEmpty() { + ManifestSnapshot empty = ManifestSnapshot.empty(); + assertTrue(empty.isEmpty()); + assertEquals(0, empty.getVersion()); + assertEquals(Collections.emptyList(), empty.getParquetFileNames()); + } + + private void mockReadFile(String uri, String content) { + when(asyncStorageClient.readFileAsBytes(uri)) + .thenReturn( + CompletableFuture.completedFuture(content.getBytes(StandardCharsets.UTF_8))); + } + + private static CompletableFuture failedFuture(Throwable ex) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(ex); + return f; + } +} diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java index 41b4a5af..bfb1a748 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java @@ -298,6 +298,9 @@ void testUploadMetadataFromPreviousCheckpointArchivedNotProcessed() { .checkpoint(currentCheckpointJson) .build())) .build())); + when(hoodiePropertiesReader.readHoodieProperties( + String.format("%s%s/%s", S3_TABLE_URI, HOODIE_FOLDER_NAME, HOODIE_PROPERTIES_FILE))) + .thenReturn(CompletableFuture.completedFuture(PARSED_HUDI_PROPERTIES)); when(timelineCommitInstantsUploader.batchUploadWithCheckpoint( TABLE_ID.toString(), TABLE, @@ -388,6 +391,10 @@ void testUploadMetadataOfANewlyDiscoveredAndPreviouslyProcessedTable() { .thenReturn(CompletableFuture.completedFuture(PARSED_HUDI_PROPERTIES)); when(onehouseApiClient.initializeTableMetricsCheckpoint(expectedRequest)) .thenReturn(CompletableFuture.completedFuture(initializeTableMetricsCheckpointResponse)); + // table 2 (existing table) - read properties for version info + when(hoodiePropertiesReader.readHoodieProperties( + String.format("%s%s/%s", s3TableUri2, HOODIE_FOLDER_NAME, HOODIE_PROPERTIES_FILE))) + .thenReturn(CompletableFuture.completedFuture(PARSED_HUDI_PROPERTIES)); // table 1 when(timelineCommitInstantsUploader.batchUploadWithCheckpoint( TABLE_ID.toString(), @@ -482,6 +489,9 @@ void testUploadMetadataFromPreviousCheckpointArchivedProcessedActiveTimelineProc .checkpoint(currentCheckpointJson) .build())) .build())); + when(hoodiePropertiesReader.readHoodieProperties( + String.format("%s%s/%s", S3_TABLE_URI, HOODIE_FOLDER_NAME, HOODIE_PROPERTIES_FILE))) + .thenReturn(CompletableFuture.completedFuture(PARSED_HUDI_PROPERTIES)); Checkpoint expectedCheckpoint = shouldResetCheckpoint ? currentCheckpointWithResetFields : currentCheckpoint; when(timelineCommitInstantsUploader.paginatedBatchUploadWithCheckpoint( diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java index 5f5138ea..d718546f 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java @@ -74,16 +74,27 @@ class TimelineCommitInstantsUploaderTest { @Mock private MetadataExtractorConfig metadataExtractorConfig; @Mock private ActiveTimelineInstantBatcher activeTimelineInstantBatcher; @Mock private LakeViewExtractorMetrics hudiMetadataExtractorMetrics; + @Mock private LSMTimelineManifestReader lsmTimelineManifestReader; private TimelineCommitInstantsUploader timelineCommitInstantsUploader; private final ObjectMapper mapper = new ObjectMapper(); private static final String S3_TABLE_URI = "s3://bucket/table/"; private static final String ARCHIVED_FOLDER_PREFIX = "archived/"; + private static final String TIMELINE_HISTORY_PREFIX = "timeline/history/"; + private static final String TIMELINE_PREFIX = "timeline/"; private static final Table TABLE = Table.builder() .absoluteTableUri(S3_TABLE_URI) .databaseName("database") .lakeName("lake") .build(); + private static final Table TABLE_V2 = + Table.builder() + .absoluteTableUri(S3_TABLE_URI) + .databaseName("database") + .lakeName("lake") + .tableVersion(9) + .timelineLayoutVersion(2) + .build(); private static final String TABLE_PREFIX = "table"; private static final UUID TABLE_ID = UUID.nameUUIDFromBytes(S3_TABLE_URI.getBytes()); private static final String PRESIGNED_URL_PREFIX = "http://presigned-url/"; @@ -109,7 +120,8 @@ private TimelineCommitInstantsUploader getTimelineCommitInstantsUploader(TestInf ForkJoinPool.commonPool(), activeTimelineInstantBatcher, hudiMetadataExtractorMetrics, - config); + config, + lsmTimelineManifestReader); } @BeforeEach @@ -1476,6 +1488,420 @@ void testPaginatedBatchUploadWithCheckpointExceptionallyBlockWithNullMessage() { eq("Exception when uploading instants for table " + TABLE + " timeline COMMIT_TIMELINE_TYPE_ACTIVE: java.lang.RuntimeException")); } + @Test + void testUploadInstantsInArchivedTimelineV2_BootstrapMirror() { + // Initial sync: no previous manifest version recorded, so we mirror everything in the + // current manifest (bootstrap path). + TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = + spy(timelineCommitInstantsUploader); + doReturn(100) + .when(timelineCommitInstantsUploaderSpy) + .getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + + String historyUri = TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/"; + String parquet1 = "20260130205837315_20260201000250371_0.parquet"; + String parquet2 = "20260130205837315_20260201000250371_1.parquet"; + + when(lsmTimelineManifestReader.readLatestManifest(historyUri)) + .thenReturn( + CompletableFuture.completedFuture( + LSMTimelineManifestReader.ManifestSnapshot.of( + 1, Arrays.asList(parquet1, parquet2)))); + + // Single batch: hoodie.properties + 2 parquets + manifest_1 + _version_ + List expectedUploads = + Arrays.asList( + UploadedFile.builder().name(HOODIE_PROPERTIES_FILE).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + parquet1).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + parquet2).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + "manifest_1").build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + "_version_").build()); + + Checkpoint expectedCheckpoint = + INITIAL_CHECKPOINT + .toBuilder() + .batchId(1) + .lastUploadedFile("_version_") + .checkpointTimestamp(Instant.EPOCH) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(1) + .build(); + + stubV2ArchivedSingleBatchUpload(expectedUploads, expectedCheckpoint); + + Checkpoint response = + timelineCommitInstantsUploaderSpy + .batchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + INITIAL_CHECKPOINT, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .join(); + + assertEquals(expectedCheckpoint, response); + verify(lsmTimelineManifestReader, times(1)).readLatestManifest(historyUri); + // No previous manifest fetch on the bootstrap path. + verify(lsmTimelineManifestReader, times(0)) + .readManifestFileNames(anyString(), anyInt()); + } + + @Test + void testUploadInstantsInArchivedTimelineV2_IncrementalAfterCompaction() { + // Second sync after a compaction: previous manifest_1 referenced two L0 parquets, + // current manifest_2 references a compacted L1 parquet plus a brand-new L0 parquet. + // We should upload only the files NOT already in the previous manifest. + TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = + spy(timelineCommitInstantsUploader); + doReturn(100) + .when(timelineCommitInstantsUploaderSpy) + .getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + String historyUri = TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/"; + String oldParquet1 = "20260130000000000_20260130100000000_0.parquet"; + String oldParquet2 = "20260130100000000_20260130200000000_0.parquet"; + String compactedParquet = "20260130000000000_20260130200000000_1.parquet"; + String newParquet = "20260130200000000_20260131000000000_0.parquet"; + + when(lsmTimelineManifestReader.readLatestManifest(historyUri)) + .thenReturn( + CompletableFuture.completedFuture( + LSMTimelineManifestReader.ManifestSnapshot.of( + 2, Arrays.asList(compactedParquet, newParquet)))); + when(lsmTimelineManifestReader.readManifestFileNames(historyUri, 1)) + .thenReturn( + CompletableFuture.completedFuture(Arrays.asList(oldParquet1, oldParquet2))); + + // Both files in the new manifest are net-new vs. manifest_1, so both get uploaded + // alongside manifest_2 and _version_. hoodie.properties is NOT included because batchId > 0. + List expectedUploads = + Arrays.asList( + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + compactedParquet).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + newParquet).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + "manifest_2").build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + "_version_").build()); + + Checkpoint previousCheckpoint = + INITIAL_CHECKPOINT + .toBuilder() + .batchId(3) + .lastUploadedFile("_version_") + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(1) + .build(); + Checkpoint expectedCheckpoint = + previousCheckpoint + .toBuilder() + .batchId(4) + .lastUploadedFile("_version_") + .checkpointTimestamp(Instant.EPOCH) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(2) + .build(); + + stubV2ArchivedSingleBatchUpload(expectedUploads, expectedCheckpoint); + + Checkpoint response = + timelineCommitInstantsUploaderSpy + .batchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + previousCheckpoint, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .join(); + + assertEquals(expectedCheckpoint, response); + verify(lsmTimelineManifestReader, times(1)).readManifestFileNames(historyUri, 1); + } + + @Test + void testUploadInstantsInArchivedTimelineV2_NoArchivesYet() { + // Table has no archived timeline yet (no _version_ file). The reader returns an empty + // snapshot and we mark archived as processed without touching the API client. + String historyUri = TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/"; + when(lsmTimelineManifestReader.readLatestManifest(historyUri)) + .thenReturn( + CompletableFuture.completedFuture( + LSMTimelineManifestReader.ManifestSnapshot.empty())); + + Checkpoint response = + timelineCommitInstantsUploader + .batchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + INITIAL_CHECKPOINT, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .join(); + + assertEquals(true, response.isArchivedCommitsProcessed()); + assertEquals(0, response.getLastArchivedManifestVersion()); + verify(onehouseApiClient, times(0)).generateCommitMetadataUploadUrl(any()); + verify(onehouseApiClient, times(0)).upsertTableMetricsCheckpoint(any()); + } + + @Test + void testUploadInstantsInArchivedTimelineV2_AlreadyAtLatestManifest() { + // No new archives since last sync: reader reports the same version we already mirrored. + String historyUri = TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/"; + when(lsmTimelineManifestReader.readLatestManifest(historyUri)) + .thenReturn( + CompletableFuture.completedFuture( + LSMTimelineManifestReader.ManifestSnapshot.of(7, Collections.emptyList()))); + + Checkpoint previousCheckpoint = + INITIAL_CHECKPOINT + .toBuilder() + .batchId(2) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(7) + .build(); + + Checkpoint response = + timelineCommitInstantsUploader + .batchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + previousCheckpoint, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .join(); + + assertEquals(previousCheckpoint, response); + verify(onehouseApiClient, times(0)).generateCommitMetadataUploadUrl(any()); + verify(onehouseApiClient, times(0)).upsertTableMetricsCheckpoint(any()); + } + + @SneakyThrows + private void stubV2ArchivedSingleBatchUpload( + List expectedUploads, Checkpoint expectedCheckpoint) { + List filenames = + expectedUploads.stream().map(UploadedFile::getName).collect(Collectors.toList()); + List presignedUrls = + filenames.stream().map(name -> PRESIGNED_URL_PREFIX + name).collect(Collectors.toList()); + + when(onehouseApiClient.generateCommitMetadataUploadUrl( + GenerateCommitMetadataUploadUrlRequest.builder() + .tableId(TABLE_ID.toString()) + .commitInstants(filenames) + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .build())) + .thenReturn( + CompletableFuture.completedFuture( + GenerateCommitMetadataUploadUrlResponse.builder() + .uploadUrls(presignedUrls) + .build())); + + for (String presignedUrl : presignedUrls) { + String fileName = presignedUrl.substring(PRESIGNED_URL_PREFIX.length()); + String fileUri = S3_TABLE_URI + ".hoodie/" + fileName; + when(presignedUrlFileUploader.uploadFileToPresignedUrl( + eq(presignedUrl), eq(fileUri), anyInt())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + + when(onehouseApiClient.upsertTableMetricsCheckpoint( + UpsertTableMetricsCheckpointRequest.builder() + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .tableId(TABLE_ID.toString()) + .checkpoint(mapper.writeValueAsString(expectedCheckpoint)) + .filesUploaded(filenames) + .uploadedFiles(expectedUploads) + .build())) + .thenReturn( + CompletableFuture.completedFuture( + UpsertTableMetricsCheckpointResponse.builder().build())); + } + + @Tag("Blocking") + @Test + void testUploadInstantsInActiveTimelineV2() { + TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = + spy(timelineCommitInstantsUploader); + + doReturn(4) + .when(timelineCommitInstantsUploaderSpy) + .getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE); + + // archived already processed + Checkpoint previousCheckpoint = generateCheckpointObj(3, Instant.EPOCH, false, ""); + + // V2 active timeline path: .hoodie/timeline/ + // Page 1 + mockListPage( + TABLE_PREFIX + "/.hoodie/timeline/", + CONTINUATION_TOKEN_PREFIX + "1", + null, + Arrays.asList( + generateFileObj("should_be_ignored", false), + generateFileObj("20260130205837315_20260201000250371.commit", false), + generateFileObj("20260130205837315.commit.inflight", false), + generateFileObj("20260130205837315.commit.requested", false), + generateFileObj( + "20260201000250371_20260202000350471.commit", false, currentTime))); + // Page 2 (last page) + mockListPage( + TABLE_PREFIX + "/.hoodie/timeline/", + null, + TABLE_PREFIX + + "/.hoodie/timeline/" + + "20260130205837315_20260201000250371.commit", + Arrays.asList( + generateFileObj( + "20260201000250371_20260202000350471.commit", false, currentTime), + generateFileObj("20260201000250371.commit.inflight", false), + generateFileObj("20260201000250371.commit.requested", false), + generateFileObj(HOODIE_PROPERTIES_FILE, false))); + + List batch1 = + Arrays.asList( + generateFileObj( + "20260130205837315_20260201000250371.commit", false), + generateFileObj( + "20260130205837315.commit.inflight", false), + generateFileObj( + "20260130205837315.commit.requested", false)); + + List batch2 = + Arrays.asList( + generateFileObj( + "20260201000250371_20260202000350471.commit", false, currentTime), + generateFileObj( + "20260201000250371.commit.inflight", false), + generateFileObj( + "20260201000250371.commit.requested", false)); + + Checkpoint checkpoint1 = + generateCheckpointObj( + previousCheckpoint.getBatchId() + 1, + Instant.EPOCH, + true, + "20260130205837315_20260201000250371.commit"); + Checkpoint checkpoint2 = + generateCheckpointObj( + previousCheckpoint.getBatchId() + 2, + currentTime, + true, + "20260201000250371_20260202000350471.commit"); + + stubCreateBatches( + Stream.of( + generateFileObj( + "20260130205837315_20260201000250371.commit", false), + generateFileObj( + "20260130205837315.commit.inflight", false), + generateFileObj( + "20260130205837315.commit.requested", false), + generateFileObj( + "20260201000250371_20260202000350471.commit", + false, + currentTime)) + .collect(Collectors.toList()), + Collections.singletonList(batch1), + previousCheckpoint, + previousCheckpoint.getFirstIncompleteCommitFile()); + + stubCreateBatches( + Arrays.asList( + generateFileObj( + "20260201000250371_20260202000350471.commit", false, currentTime), + generateFileObj( + "20260201000250371.commit.inflight", false), + generateFileObj( + "20260201000250371.commit.requested", false)), + Collections.singletonList(batch2), + checkpoint1, + checkpoint1.getFirstIncompleteCommitFile()); + + stubUploadInstantsCallsV2( + batch1.stream() + .map(file -> UploadedFile.builder().name(file.getFilename()).build()) + .collect(Collectors.toList()), + checkpoint1, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE); + stubUploadInstantsCallsV2( + batch2.stream() + .map( + file -> + UploadedFile.builder() + .name(file.getFilename()) + .lastModifiedAt(file.getLastModifiedAt().toEpochMilli()) + .build()) + .collect(Collectors.toList()), + checkpoint2, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE); + + Checkpoint response = + timelineCommitInstantsUploaderSpy + .paginatedBatchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + previousCheckpoint, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE) + .join(); + + assertEquals(checkpoint2, response); + } + + @SneakyThrows + private void stubUploadInstantsCallsV2( + List filesUploaded, + Checkpoint updatedCheckpoint, + CommitTimelineType commitTimelineType) { + filesUploaded = + filesUploaded.stream() + .map( + file -> + UploadedFile.builder() + .name( + addPrefixToFileNameV2(file.getName(), commitTimelineType)) + .lastModifiedAt(file.getLastModifiedAt()) + .build()) + .collect(Collectors.toList()); + List filesUploadedWithUpdatedName = + filesUploaded.stream().map(UploadedFile::getName).collect(Collectors.toList()); + List presignedUrls = + filesUploadedWithUpdatedName.stream() + .map(fileName -> PRESIGNED_URL_PREFIX + fileName) + .collect(Collectors.toList()); + when(onehouseApiClient.generateCommitMetadataUploadUrl( + GenerateCommitMetadataUploadUrlRequest.builder() + .tableId(TABLE_ID.toString()) + .commitInstants(filesUploadedWithUpdatedName) + .commitTimelineType(commitTimelineType) + .build())) + .thenReturn( + CompletableFuture.completedFuture( + GenerateCommitMetadataUploadUrlResponse.builder() + .uploadUrls(presignedUrls) + .build())); + for (String presignedUrl : presignedUrls) { + String fileName = presignedUrl.substring(PRESIGNED_URL_PREFIX.length()); + String fileUri = S3_TABLE_URI + ".hoodie/" + fileName; + when(presignedUrlFileUploader.uploadFileToPresignedUrl( + presignedUrl, fileUri, metadataExtractorConfig.getFileUploadStreamBatchSize())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + when(onehouseApiClient.upsertTableMetricsCheckpoint( + UpsertTableMetricsCheckpointRequest.builder() + .commitTimelineType(commitTimelineType) + .tableId(TABLE_ID.toString()) + .checkpoint(mapper.writeValueAsString(updatedCheckpoint)) + .filesUploaded(filesUploadedWithUpdatedName) + .uploadedFiles(filesUploaded) + .build())) + .thenReturn( + CompletableFuture.completedFuture( + UpsertTableMetricsCheckpointResponse.builder().build())); + } + + private String addPrefixToFileNameV2( + String fileName, CommitTimelineType commitTimelineType) { + if (HOODIE_PROPERTIES_FILE.equals(fileName)) { + return fileName; + } + if (CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType)) { + return TIMELINE_HISTORY_PREFIX + fileName; + } + return TIMELINE_PREFIX + fileName; + } + private void stubCreateBatches( List files, List> expectedBatches,