From 6665a15cee60b4b17dbda7675902ef09393de09e Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Thu, 26 Mar 2026 14:05:42 -0700 Subject: [PATCH 01/10] [ENG-38901] Support Hudi table version 9 timeline layout Add support for Hudi timeline layout version 2 (used by table version 9) while maintaining backward compatibility with layout version 1. Changes: - Read hoodie.timeline.layout.version and hoodie.table.version from hoodie.properties and propagate through Table model - Update active/archived timeline path construction for V2 layout (.hoodie/timeline/ and .hoodie/timeline/history/) - Update regex patterns to match V9 completed instant filenames ({timestamp}_{completionTimestamp}.action) - Fix timestamp extraction and instant grouping for V9 format - Add V9 archived file parsing (parquet, manifest, _version_) - Add "clustering" and "logcompaction" action types - Update path-stripping helpers for V2 layout awareness Co-Authored-By: Claude Opus 4.6 --- .../constants/MetadataExtractorConstants.java | 20 +- .../ActiveTimelineInstantBatcher.java | 14 +- .../HoodiePropertiesReader.java | 8 + .../TableMetadataUploaderService.java | 47 ++- .../TimelineCommitInstantsUploader.java | 128 ++++++-- .../models/ParsedHudiProperties.java | 2 + .../metadata_extractor/models/Table.java | 2 + .../ActiveTimelineInstantBatcherTest.java | 31 ++ .../HoodiePropertiesReaderTest.java | 25 ++ .../TableMetadataUploaderServiceTest.java | 10 + .../TimelineCommitInstantsUploaderTest.java | 289 ++++++++++++++++++ 11 files changed, 534 insertions(+), 42 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java index f606bb81..44277529 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java @@ -17,6 +17,11 @@ private MetadataExtractorConstants() {} public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name"; public static final String HOODIE_TABLE_TYPE_KEY = "hoodie.table.type"; + public static final String HOODIE_TABLE_VERSION_KEY = "hoodie.table.version"; + public static final String HOODIE_TIMELINE_LAYOUT_VERSION_KEY = + "hoodie.timeline.layout.version"; + public static final String TIMELINE_FOLDER_NAME = "timeline"; + public static final String HISTORY_FOLDER_NAME = "history"; // The default number of instants in one archived commit metadata file is 10 // so we want to ingest 10x active instants than archived instants in one batch @@ -39,10 +44,19 @@ private MetadataExtractorConstants() {} // Default batch size will be 5 MB public static final int DEFAULT_FILE_UPLOAD_STREAM_BATCH_SIZE = Integer.parseInt(System.getenv().getOrDefault("FILE_UPLOAD_STREAM_BATCH_SIZE", "5242880")); + public static final String VERSION_MARKER_FILE = "_version_"; public static final Pattern ARCHIVED_COMMIT_INSTANT_PATTERN = Pattern.compile("\\.commits_\\.archive\\.\\d+_\\d+-\\d+-\\d+"); + public static final Pattern ARCHIVED_COMMIT_INSTANT_PATTERN_V2 = + Pattern.compile("\\d+_\\d+_\\d+\\.parquet|manifest_\\d+|" + VERSION_MARKER_FILE); public static final Pattern ACTIVE_COMMIT_INSTANT_PATTERN = - Pattern.compile("\\d+(\\.[a-z]{1,20}){1,2}"); + Pattern.compile("\\d+(_\\d+)?(\\.[a-z]{1,20}){1,2}"); + public static final Pattern V1_ARCHIVED_NUMERIC_PATTERN = + Pattern.compile("\\.archive\\.(\\d+)_"); + public static final Pattern V2_PARQUET_NUMERIC_PATTERN = + Pattern.compile("^(\\d+)_\\d+_\\d+\\.parquet$"); + public static final Pattern V2_MANIFEST_NUMERIC_PATTERN = + Pattern.compile("^manifest_(\\d+)$"); public static final Checkpoint INITIAL_CHECKPOINT = Checkpoint.builder() .batchId(0) @@ -73,5 +87,7 @@ private MetadataExtractorConstants() {} "restore", "clean", "compaction", - "replacecommit"); + "replacecommit", + "clustering", + "logcompaction"); } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java index 5c716a94..0efafd1e 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java @@ -199,7 +199,10 @@ private List sortAndFilterInstants(List instants) { private List sortAndFilterInstants(List instants, Instant lastModifiedFilter) { return instants.stream() .filter(this::filterFile) - .collect(Collectors.groupingBy(file -> file.getFilename().split("\\.", 3)[0])) + .collect(Collectors.groupingBy(file -> { + String rawKey = file.getFilename().split("\\.", 3)[0]; + return rawKey.contains("_") ? rawKey.split("_")[0] : rawKey; + })) .values() .stream() .filter( @@ -264,6 +267,13 @@ static boolean areRelatedSavepointOrRollbackInstants( static ActiveTimelineInstant getActiveTimeLineInstant(String instant) { String[] parts = instant.split("\\.", 3); + // Strip completion timestamp from V9 completed instants + // e.g., "20260204053206256_20260204053210895" -> "20260204053206256" + String timestamp = parts[0]; + if (timestamp.contains("_")) { + timestamp = timestamp.split("_")[0]; + } + String action; String state; // For commit action, metadata file in inflight state is in the format of XYZ.inflight @@ -274,7 +284,7 @@ static ActiveTimelineInstant getActiveTimeLineInstant(String instant) { action = parts[1]; state = parts.length == 3 ? parts[2] : "completed"; } - return ActiveTimelineInstant.builder().timestamp(parts[0]).action(action).state(state).build(); + return ActiveTimelineInstant.builder().timestamp(timestamp).action(action).state(state).build(); } @Builder diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java index 7690d13c..88a15fc8 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java @@ -2,6 +2,8 @@ import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_NAME_KEY; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_TYPE_KEY; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_KEY; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TIMELINE_LAYOUT_VERSION_KEY; import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; import com.google.inject.Inject; @@ -42,9 +44,15 @@ public CompletableFuture readHoodieProperties(String path) } catch (IOException e) { throw new RuntimeException("Failed to load properties file", e); } + int tableVersion = Integer.parseInt( + properties.getProperty(HOODIE_TABLE_VERSION_KEY, "6")); + int timelineLayoutVersion = Integer.parseInt( + properties.getProperty(HOODIE_TIMELINE_LAYOUT_VERSION_KEY, "1")); return ParsedHudiProperties.builder() .tableName(properties.getProperty(HOODIE_TABLE_NAME_KEY)) .tableType(TableType.valueOf(properties.getProperty(HOODIE_TABLE_TYPE_KEY))) + .tableVersion(tableVersion) + .timelineLayoutVersion(timelineLayoutVersion) .build(); }) .exceptionally( diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java index 6e9c2353..41da2e26 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java @@ -1,6 +1,7 @@ package ai.onehouse.metadata_extractor; import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_COMMIT_INSTANT_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_COMMIT_INSTANT_PATTERN_V2; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE; import static ai.onehouse.constants.MetadataExtractorConstants.INITIAL_CHECKPOINT; @@ -22,6 +23,7 @@ import ai.onehouse.metadata_extractor.models.Checkpoint; import ai.onehouse.metadata_extractor.models.Table; import ai.onehouse.metrics.LakeViewExtractorMetrics; +import ai.onehouse.metadata_extractor.models.ParsedHudiProperties; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +33,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.stream.Collectors; @@ -139,13 +142,29 @@ private CompletableFuture uploadInstantsInTableBatch(List tables // checkpoints found, continue from previous checkpoint String checkpointString = tableCheckpointMap.get(table.getTableId()).getCheckpoint(); + Checkpoint checkpoint = + StringUtils.isNotBlank(checkpointString) + ? mapper.readValue(checkpointString, Checkpoint.class) + : INITIAL_CHECKPOINT; processTablesFuture.add( - uploadNewInstantsSinceCheckpoint( - table.getTableId(), - table, - StringUtils.isNotBlank(checkpointString) - ? mapper.readValue(checkpointString, Checkpoint.class) - : INITIAL_CHECKPOINT)); + hoodiePropertiesReader + .readHoodieProperties(getHoodiePropertiesFilePath(table)) + .thenComposeAsync( + properties -> { + Table tableWithVersion = table; + if (properties != null) { + tableWithVersion = table.toBuilder() + .tableVersion(properties.getTableVersion()) + .timelineLayoutVersion( + properties.getTimelineLayoutVersion()) + .build(); + } + return uploadNewInstantsSinceCheckpoint( + tableWithVersion.getTableId(), + tableWithVersion, + checkpoint); + }, + executorService)); } catch (JsonProcessingException e) { log.error( "Error deserializing checkpoint value for table: {}, skipping table", @@ -202,6 +221,7 @@ private int getNumberOfMissingTables(List tableIdToProperties = new ConcurrentHashMap<>(); List< CompletableFuture< InitializeTableMetricsCheckpointRequest @@ -232,6 +252,7 @@ private int getNumberOfMissingTables(List uploadNewInstantsSinceCheckpoint( * this allows us to continue from the previous batch id */ Checkpoint activeTimelineCheckpoint = - ARCHIVED_COMMIT_INSTANT_PATTERN.matcher(checkpoint.getLastUploadedFile()).matches() + (ARCHIVED_COMMIT_INSTANT_PATTERN.matcher(checkpoint.getLastUploadedFile()).matches() + || ARCHIVED_COMMIT_INSTANT_PATTERN_V2.matcher(checkpoint.getLastUploadedFile()) + .matches()) ? resetCheckpoint(checkpoint) : checkpoint; return timelineCommitInstantsUploader diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java index 52465978..978a4cc5 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java @@ -2,12 +2,19 @@ import static ai.onehouse.constants.MetadataExtractorConstants.ACTIVE_COMMIT_INSTANT_PATTERN; import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_COMMIT_INSTANT_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_COMMIT_INSTANT_PATTERN_V2; import static ai.onehouse.constants.MetadataExtractorConstants.ARCHIVED_FOLDER_NAME; +import static ai.onehouse.constants.MetadataExtractorConstants.HISTORY_FOLDER_NAME; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE_OBJ; import static ai.onehouse.constants.MetadataExtractorConstants.ROLLBACK_ACTION; import static ai.onehouse.constants.MetadataExtractorConstants.SAVEPOINT_ACTION; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_FOLDER_NAME; +import static ai.onehouse.constants.MetadataExtractorConstants.V1_ARCHIVED_NUMERIC_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.V2_MANIFEST_NUMERIC_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.V2_PARQUET_NUMERIC_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.VERSION_MARKER_FILE; import static ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher.areRelatedInstants; import static ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher.areRelatedSavepointOrRollbackInstants; import static ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher.getActiveTimeLineInstant; @@ -43,7 +50,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -108,7 +114,8 @@ public CompletableFuture batchUploadWithCheckpoint( String timelineUri = storageUtils.constructFileUri( - table.getAbsoluteTableUri(), getPathSuffixForTimeline(commitTimelineType)); + table.getAbsoluteTableUri(), + getPathSuffixForTimeline(commitTimelineType, table.getTimelineLayoutVersion())); return executeFullBatchUpload(tableId, table, timelineUri, checkpoint, commitTimelineType); } @@ -134,7 +141,9 @@ public CompletableFuture paginatedBatchUploadWithCheckpoint( String prefix = storageUtils.getPathFromUrl( storageUtils.constructFileUri( - table.getAbsoluteTableUri(), getPathSuffixForTimeline(commitTimelineType))); + table.getAbsoluteTableUri(), + getPathSuffixForTimeline( + commitTimelineType, table.getTimelineLayoutVersion()))); // startAfter is used only in the first call to get the objects, post that continuation token is // used @@ -365,7 +374,9 @@ private CompletableFuture uploadInstantsInSequentialBatches( commitTimelineType, storageUtils.constructFileUri( table.getAbsoluteTableUri(), - getPathSuffixForTimeline(commitTimelineType))) + getPathSuffixForTimeline( + commitTimelineType, table.getTimelineLayoutVersion())), + table.getTimelineLayoutVersion()) .thenComposeAsync( ignored2 -> updateCheckpointAfterProcessingBatch( @@ -377,7 +388,9 @@ private CompletableFuture uploadInstantsInSequentialBatches( file -> UploadedFile.builder() .name( - getFileNameWithPrefix(file, commitTimelineType)) + getFileNameWithPrefix( + file, commitTimelineType, + table.getTimelineLayoutVersion())) .lastModifiedAt( file.getLastModifiedAt().toEpochMilli()) .build()) @@ -408,10 +421,11 @@ private CompletableFuture uploadBatch( String tableId, List batch, CommitTimelineType commitTimelineType, - String directoryUri) { + String directoryUri, + int timelineLayoutVersion) { List commitInstants = batch.stream() - .map(file -> getFileNameWithPrefix(file, commitTimelineType)) + .map(file -> getFileNameWithPrefix(file, commitTimelineType, timelineLayoutVersion)) .collect(Collectors.toList()); return onehouseApiClient .generateCommitMetadataUploadUrl( @@ -435,7 +449,8 @@ private CompletableFuture uploadBatch( uploadFutures.add( presignedUrlFileUploader.uploadFileToPresignedUrl( generateCommitMetadataUploadUrlResponse.getUploadUrls().get(i), - constructStorageUri(directoryUri, batch.get(i).getFilename()), + constructStorageUri( + directoryUri, batch.get(i).getFilename(), timelineLayoutVersion), extractorConfig.getFileUploadStreamBatchSize()) .thenApply(result -> { hudiMetadataExtractorMetrics.incrementMetadataUploadSuccessCounter(); @@ -590,49 +605,102 @@ private boolean isInstantAlreadyUploaded( private boolean isInstantFile(String fileName) { return ACTIVE_COMMIT_INSTANT_PATTERN.matcher(fileName).matches() - || ARCHIVED_COMMIT_INSTANT_PATTERN.matcher(fileName).matches(); + || ARCHIVED_COMMIT_INSTANT_PATTERN.matcher(fileName).matches() + || ARCHIVED_COMMIT_INSTANT_PATTERN_V2.matcher(fileName).matches(); } - private String constructStorageUri(String directoryUri, String fileName) { + private String constructStorageUri( + String directoryUri, String fileName, int timelineLayoutVersion) { if (HOODIE_PROPERTIES_FILE.equals(fileName)) { - String archivedSuffix = ARCHIVED_FOLDER_NAME + '/'; - String hoodieDirectoryUri = - directoryUri.endsWith(archivedSuffix) - ? directoryUri.substring(0, directoryUri.length() - "archived/".length()) - : directoryUri; + String hoodieDirectoryUri = directoryUri; + if (timelineLayoutVersion == 2) { + // V2: strip "timeline/history/" or "timeline/" to get back to .hoodie/ + String historySuffix = TIMELINE_FOLDER_NAME + '/' + HISTORY_FOLDER_NAME + '/'; + String timelineSuffix = TIMELINE_FOLDER_NAME + '/'; + if (directoryUri.endsWith(historySuffix)) { + hoodieDirectoryUri = + directoryUri.substring(0, directoryUri.length() - historySuffix.length()); + } else if (directoryUri.endsWith(timelineSuffix)) { + hoodieDirectoryUri = + directoryUri.substring(0, directoryUri.length() - timelineSuffix.length()); + } + } else { + // V1: strip "archived/" to get back to .hoodie/ + String archivedSuffix = ARCHIVED_FOLDER_NAME + '/'; + if (directoryUri.endsWith(archivedSuffix)) { + hoodieDirectoryUri = + directoryUri.substring(0, directoryUri.length() - archivedSuffix.length()); + } + } return storageUtils.constructFileUri(hoodieDirectoryUri, HOODIE_PROPERTIES_FILE); } return storageUtils.constructFileUri(directoryUri, fileName); } - private String getPathSuffixForTimeline(CommitTimelineType commitTimelineType) { + private String getPathSuffixForTimeline( + CommitTimelineType commitTimelineType, int timelineLayoutVersion) { String pathSuffix = HOODIE_FOLDER_NAME + '/'; + if (timelineLayoutVersion == 2) { + pathSuffix += TIMELINE_FOLDER_NAME + '/'; + return CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) + ? pathSuffix + HISTORY_FOLDER_NAME + '/' + : pathSuffix; + } return CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) ? pathSuffix + ARCHIVED_FOLDER_NAME + '/' : pathSuffix; } - private String getFileNameWithPrefix(File file, CommitTimelineType commitTimelineType) { - String archivedPrefix = "archived/"; - return CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) - && !HOODIE_PROPERTIES_FILE.equals(file.getFilename()) - ? archivedPrefix + file.getFilename() - : file.getFilename(); + private String getFileNameWithPrefix( + File file, CommitTimelineType commitTimelineType, int timelineLayoutVersion) { + if (CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) + && !HOODIE_PROPERTIES_FILE.equals(file.getFilename())) { + if (timelineLayoutVersion == 2) { + return TIMELINE_FOLDER_NAME + "/" + HISTORY_FOLDER_NAME + "/" + file.getFilename(); + } + return ARCHIVED_FOLDER_NAME + "/" + file.getFilename(); + } + if (timelineLayoutVersion == 2 + && !HOODIE_PROPERTIES_FILE.equals(file.getFilename())) { + return TIMELINE_FOLDER_NAME + "/" + file.getFilename(); + } + return file.getFilename(); } private BigDecimal getCommitIdFromActiveTimelineInstant(String activeTimeLineInstant) { - return new BigDecimal(activeTimeLineInstant.split("\\.")[0]); + String timestampPart = activeTimeLineInstant.split("\\.")[0]; + if (timestampPart.contains("_")) { + timestampPart = timestampPart.split("_")[0]; + } + return new BigDecimal(timestampPart); } - private int getNumericPartFromArchivedCommit(String archivedCommitFileName) { - Pattern pattern = Pattern.compile("\\.archive\\.(\\d+)_"); - Matcher matcher = pattern.matcher(archivedCommitFileName); + private long getNumericPartFromArchivedCommit(String archivedCommitFileName) { + // V1: .commits_.archive.5_20260101-20260115-50 + Matcher v1Matcher = V1_ARCHIVED_NUMERIC_PATTERN.matcher(archivedCommitFileName); + if (v1Matcher.find()) { + return Long.parseLong(v1Matcher.group(1)); + } - if (matcher.find()) { - return Integer.parseInt(matcher.group(1)); - } else { - throw new IllegalArgumentException("invalid archived commit file type"); + // V2 parquet: 20260130205837315_20260201000250371_3.parquet + Matcher v2ParquetMatcher = V2_PARQUET_NUMERIC_PATTERN.matcher(archivedCommitFileName); + if (v2ParquetMatcher.find()) { + return Long.parseLong(v2ParquetMatcher.group(1)); + } + + // V2 manifest: manifest_4243 + Matcher v2ManifestMatcher = V2_MANIFEST_NUMERIC_PATTERN.matcher(archivedCommitFileName); + if (v2ManifestMatcher.find()) { + return Long.parseLong(v2ManifestMatcher.group(1)); + } + + // V2 version marker: _version_ + if (VERSION_MARKER_FILE.equals(archivedCommitFileName)) { + return Long.MAX_VALUE; } + + throw new IllegalArgumentException( + "invalid archived commit file type: " + archivedCommitFileName); } public String getStartAfterString(String prefix, Checkpoint checkpoint, boolean isFirstFetch) { diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java index 21ddeacb..d92ef259 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java @@ -14,4 +14,6 @@ public class ParsedHudiProperties { @NonNull String tableName; @NonNull TableType tableType; @Nullable MetricsConstants.MetadataUploadFailureReasons metadataUploadFailureReasons; + @Builder.Default int tableVersion = 6; + @Builder.Default int timelineLayoutVersion = 1; } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java index c60037d6..e6457797 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java @@ -15,4 +15,6 @@ public class Table { private final String databaseName; private final String lakeName; private String tableId; + @Builder.Default private final int tableVersion = 6; + @Builder.Default private final int timelineLayoutVersion = 1; } diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcherTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcherTest.java index 54ddef9d..2413ab7b 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcherTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcherTest.java @@ -575,6 +575,37 @@ void testCreateBatchJustHoodieProperties(List instants, List> e assertEquals(expectedBatches, actualBatches); } + @Test + void testCreateBatchWithV9CompletedInstants() { + // V9 completed instants have completion timestamp: {ts}_{completionTs}.action + // Requested and inflight files remain unchanged: {ts}.action.requested + List files = + Arrays.asList( + generateFileObj("20260204053206256.deltacommit.requested"), + generateFileObj("20260204053206256.deltacommit.inflight"), + generateFileObj("20260204053206256_20260204053210895.deltacommit"), + generateFileObj("20260204053205307.compaction.requested"), + generateFileObj("20260204053205307.compaction.inflight"), + generateFileObj("20260204053205307_20260204053222939.commit"), + generateFileObj("hoodie.properties")); + + List> expectedBatches = + Arrays.asList( + Arrays.asList( + generateFileObj("hoodie.properties"), + generateFileObj("20260204053205307.compaction.inflight"), + generateFileObj("20260204053205307.compaction.requested"), + generateFileObj("20260204053205307_20260204053222939.commit")), + Arrays.asList( + generateFileObj("20260204053206256.deltacommit.inflight"), + generateFileObj("20260204053206256.deltacommit.requested"), + generateFileObj("20260204053206256_20260204053210895.deltacommit"))); + + List> actualBatches = + activeTimelineInstantBatcher.createBatches(files, 4, getCheckpoint()).getRight(); + assertEquals(expectedBatches, actualBatches); + } + @Test void testWithInvalidBatchSize() { assertThrows( diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java index 8e1ac56e..70cad0c0 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java @@ -49,6 +49,31 @@ void testReadHoodieProperties(TableType tableType) ParsedHudiProperties result = futureResult.get(); assertEquals("test_table", result.getTableName()); assertEquals(tableType, result.getTableType()); + assertEquals(6, result.getTableVersion()); + assertEquals(1, result.getTimelineLayoutVersion()); + } + + @Test + void testReadHoodiePropertiesV9() throws ExecutionException, InterruptedException { + String path = "some/path/to/properties/file"; + String propertiesContent = + "hoodie.table.name=test_v9_table\n" + + "hoodie.table.type=MERGE_ON_READ\n" + + "hoodie.table.version=9\n" + + "hoodie.timeline.layout.version=2"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(propertiesContent.getBytes()); + + when(asyncStorageClient.streamFileAsync(path)) + .thenReturn(CompletableFuture.completedFuture(getFileStreamData(inputStream))); + + CompletableFuture futureResult = + hoodiePropertiesReader.readHoodieProperties(path); + + ParsedHudiProperties result = futureResult.get(); + assertEquals("test_v9_table", result.getTableName()); + assertEquals(TableType.MERGE_ON_READ, result.getTableType()); + assertEquals(9, result.getTableVersion()); + assertEquals(2, result.getTimelineLayoutVersion()); } @Test diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java index 41b4a5af..bfb1a748 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableMetadataUploaderServiceTest.java @@ -298,6 +298,9 @@ void testUploadMetadataFromPreviousCheckpointArchivedNotProcessed() { .checkpoint(currentCheckpointJson) .build())) .build())); + when(hoodiePropertiesReader.readHoodieProperties( + String.format("%s%s/%s", S3_TABLE_URI, HOODIE_FOLDER_NAME, HOODIE_PROPERTIES_FILE))) + .thenReturn(CompletableFuture.completedFuture(PARSED_HUDI_PROPERTIES)); when(timelineCommitInstantsUploader.batchUploadWithCheckpoint( TABLE_ID.toString(), TABLE, @@ -388,6 +391,10 @@ void testUploadMetadataOfANewlyDiscoveredAndPreviouslyProcessedTable() { .thenReturn(CompletableFuture.completedFuture(PARSED_HUDI_PROPERTIES)); when(onehouseApiClient.initializeTableMetricsCheckpoint(expectedRequest)) .thenReturn(CompletableFuture.completedFuture(initializeTableMetricsCheckpointResponse)); + // table 2 (existing table) - read properties for version info + when(hoodiePropertiesReader.readHoodieProperties( + String.format("%s%s/%s", s3TableUri2, HOODIE_FOLDER_NAME, HOODIE_PROPERTIES_FILE))) + .thenReturn(CompletableFuture.completedFuture(PARSED_HUDI_PROPERTIES)); // table 1 when(timelineCommitInstantsUploader.batchUploadWithCheckpoint( TABLE_ID.toString(), @@ -482,6 +489,9 @@ void testUploadMetadataFromPreviousCheckpointArchivedProcessedActiveTimelineProc .checkpoint(currentCheckpointJson) .build())) .build())); + when(hoodiePropertiesReader.readHoodieProperties( + String.format("%s%s/%s", S3_TABLE_URI, HOODIE_FOLDER_NAME, HOODIE_PROPERTIES_FILE))) + .thenReturn(CompletableFuture.completedFuture(PARSED_HUDI_PROPERTIES)); Checkpoint expectedCheckpoint = shouldResetCheckpoint ? currentCheckpointWithResetFields : currentCheckpoint; when(timelineCommitInstantsUploader.paginatedBatchUploadWithCheckpoint( diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java index 5f5138ea..8afddb31 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java @@ -78,12 +78,22 @@ class TimelineCommitInstantsUploaderTest { private final ObjectMapper mapper = new ObjectMapper(); private static final String S3_TABLE_URI = "s3://bucket/table/"; private static final String ARCHIVED_FOLDER_PREFIX = "archived/"; + private static final String TIMELINE_HISTORY_PREFIX = "timeline/history/"; + private static final String TIMELINE_PREFIX = "timeline/"; private static final Table TABLE = Table.builder() .absoluteTableUri(S3_TABLE_URI) .databaseName("database") .lakeName("lake") .build(); + private static final Table TABLE_V2 = + Table.builder() + .absoluteTableUri(S3_TABLE_URI) + .databaseName("database") + .lakeName("lake") + .tableVersion(9) + .timelineLayoutVersion(2) + .build(); private static final String TABLE_PREFIX = "table"; private static final UUID TABLE_ID = UUID.nameUUIDFromBytes(S3_TABLE_URI.getBytes()); private static final String PRESIGNED_URL_PREFIX = "http://presigned-url/"; @@ -1476,6 +1486,285 @@ void testPaginatedBatchUploadWithCheckpointExceptionallyBlockWithNullMessage() { eq("Exception when uploading instants for table " + TABLE + " timeline COMMIT_TIMELINE_TYPE_ACTIVE: java.lang.RuntimeException")); } + @Test + void testUploadInstantsInArchivedTimelineV2() { + TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = + spy(timelineCommitInstantsUploader); + + doReturn(1) + .when(timelineCommitInstantsUploaderSpy) + .getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + + // V2 archived timeline path: .hoodie/timeline/history/ + mockListAllFilesInDir( + TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/", + Arrays.asList( + generateFileObj("should_be_ignored", false), + generateFileObj("20260130205837315_20260201000250371_1.parquet", false), + generateFileObj("20260130205837315_20260201000250371_2.parquet", false), + generateFileObj("manifest_1", false), + generateFileObj("_version_", false, currentTime))); + + // Sort order by getNumericPartFromArchivedCommit: + // manifest_1 -> 1, parquet_1 -> 20260130205837315, parquet_2 -> 20260130205837315, + // _version_ -> Long.MAX_VALUE + // With batchId=0, hoodie.properties is prepended + Checkpoint checkpoint0 = generateCheckpointObj(1, Instant.EPOCH, false, HOODIE_PROPERTIES_FILE); + Checkpoint checkpoint1 = + generateCheckpointObj(2, Instant.EPOCH, false, "manifest_1"); + Checkpoint checkpoint2 = + generateCheckpointObj( + 3, Instant.EPOCH, false, "20260130205837315_20260201000250371_1.parquet"); + Checkpoint checkpoint3 = + generateCheckpointObj( + 4, Instant.EPOCH, false, "20260130205837315_20260201000250371_2.parquet"); + Checkpoint checkpoint4 = + generateCheckpointObj(5, currentTime, false, "_version_"); + + stubUploadInstantsCallsV2( + Collections.singletonList(UploadedFile.builder().name(HOODIE_PROPERTIES_FILE).build()), + checkpoint0, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + stubUploadInstantsCallsV2( + Collections.singletonList(UploadedFile.builder().name("manifest_1").build()), + checkpoint1, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + stubUploadInstantsCallsV2( + Collections.singletonList( + UploadedFile.builder() + .name("20260130205837315_20260201000250371_1.parquet") + .build()), + checkpoint2, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + stubUploadInstantsCallsV2( + Collections.singletonList( + UploadedFile.builder() + .name("20260130205837315_20260201000250371_2.parquet") + .build()), + checkpoint3, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + stubUploadInstantsCallsV2( + Collections.singletonList( + UploadedFile.builder() + .name("_version_") + .lastModifiedAt(currentTime.toEpochMilli()) + .build()), + checkpoint4, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + + Checkpoint response = + timelineCommitInstantsUploaderSpy + .batchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + INITIAL_CHECKPOINT, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .join(); + + assertEquals(checkpoint4, response); + } + + @Tag("Blocking") + @Test + void testUploadInstantsInActiveTimelineV2() { + TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = + spy(timelineCommitInstantsUploader); + + doReturn(4) + .when(timelineCommitInstantsUploaderSpy) + .getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE); + + // archived already processed + Checkpoint previousCheckpoint = generateCheckpointObj(3, Instant.EPOCH, false, ""); + + // V2 active timeline path: .hoodie/timeline/ + // Page 1 + mockListPage( + TABLE_PREFIX + "/.hoodie/timeline/", + CONTINUATION_TOKEN_PREFIX + "1", + null, + Arrays.asList( + generateFileObj("should_be_ignored", false), + generateFileObj("20260130205837315_20260201000250371.commit", false), + generateFileObj("20260130205837315_20260201000250371.inflight", false), + generateFileObj("20260130205837315_20260201000250371.commit.requested", false), + generateFileObj( + "20260201000250371_20260202000350471.commit", false, currentTime))); + // Page 2 (last page) + mockListPage( + TABLE_PREFIX + "/.hoodie/timeline/", + null, + TABLE_PREFIX + + "/.hoodie/timeline/" + + "20260130205837315_20260201000250371.commit", + Arrays.asList( + generateFileObj( + "20260201000250371_20260202000350471.commit", false, currentTime), + generateFileObj("20260201000250371_20260202000350471.inflight", false), + generateFileObj("20260201000250371_20260202000350471.commit.requested", false), + generateFileObj(HOODIE_PROPERTIES_FILE, false))); + + List batch1 = + Arrays.asList( + generateFileObj( + "20260130205837315_20260201000250371.commit", false), + generateFileObj( + "20260130205837315_20260201000250371.inflight", false), + generateFileObj( + "20260130205837315_20260201000250371.commit.requested", false)); + + List batch2 = + Arrays.asList( + generateFileObj( + "20260201000250371_20260202000350471.commit", false, currentTime), + generateFileObj( + "20260201000250371_20260202000350471.inflight", false), + generateFileObj( + "20260201000250371_20260202000350471.commit.requested", false)); + + Checkpoint checkpoint1 = + generateCheckpointObj( + previousCheckpoint.getBatchId() + 1, + Instant.EPOCH, + true, + "20260130205837315_20260201000250371.commit"); + Checkpoint checkpoint2 = + generateCheckpointObj( + previousCheckpoint.getBatchId() + 2, + currentTime, + true, + "20260201000250371_20260202000350471.commit"); + + stubCreateBatches( + Stream.of( + generateFileObj( + "20260130205837315_20260201000250371.commit", false), + generateFileObj( + "20260130205837315_20260201000250371.inflight", false), + generateFileObj( + "20260130205837315_20260201000250371.commit.requested", false), + generateFileObj( + "20260201000250371_20260202000350471.commit", + false, + currentTime)) + .collect(Collectors.toList()), + Collections.singletonList(batch1), + previousCheckpoint, + previousCheckpoint.getFirstIncompleteCommitFile()); + + stubCreateBatches( + Arrays.asList( + generateFileObj( + "20260201000250371_20260202000350471.commit", false, currentTime), + generateFileObj( + "20260201000250371_20260202000350471.inflight", false), + generateFileObj( + "20260201000250371_20260202000350471.commit.requested", false)), + Collections.singletonList(batch2), + checkpoint1, + checkpoint1.getFirstIncompleteCommitFile()); + + stubUploadInstantsCallsV2( + batch1.stream() + .map(file -> UploadedFile.builder().name(file.getFilename()).build()) + .collect(Collectors.toList()), + checkpoint1, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE); + stubUploadInstantsCallsV2( + batch2.stream() + .map( + file -> + UploadedFile.builder() + .name(file.getFilename()) + .lastModifiedAt(file.getLastModifiedAt().toEpochMilli()) + .build()) + .collect(Collectors.toList()), + checkpoint2, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE); + + Checkpoint response = + timelineCommitInstantsUploaderSpy + .paginatedBatchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + previousCheckpoint, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE) + .join(); + + assertEquals(checkpoint2, response); + } + + @SneakyThrows + private void stubUploadInstantsCallsV2( + List filesUploaded, + Checkpoint updatedCheckpoint, + CommitTimelineType commitTimelineType) { + filesUploaded = + filesUploaded.stream() + .map( + file -> + UploadedFile.builder() + .name( + addPrefixToFileNameV2(file.getName(), commitTimelineType)) + .lastModifiedAt(file.getLastModifiedAt()) + .build()) + .collect(Collectors.toList()); + List filesUploadedWithUpdatedName = + filesUploaded.stream().map(UploadedFile::getName).collect(Collectors.toList()); + List presignedUrls = + filesUploadedWithUpdatedName.stream() + .map(fileName -> PRESIGNED_URL_PREFIX + fileName) + .collect(Collectors.toList()); + when(onehouseApiClient.generateCommitMetadataUploadUrl( + GenerateCommitMetadataUploadUrlRequest.builder() + .tableId(TABLE_ID.toString()) + .commitInstants(filesUploadedWithUpdatedName) + .commitTimelineType(commitTimelineType) + .build())) + .thenReturn( + CompletableFuture.completedFuture( + GenerateCommitMetadataUploadUrlResponse.builder() + .uploadUrls(presignedUrls) + .build())); + for (String presignedUrl : presignedUrls) { + String fileName = presignedUrl.substring(PRESIGNED_URL_PREFIX.length()); + // V2: constructStorageUri for hoodie.properties strips timeline/ path + String fileUri; + if (HOODIE_PROPERTIES_FILE.equals(fileName)) { + fileUri = S3_TABLE_URI + ".hoodie/" + fileName; + } else { + // For V2, the directory is already timeline/ or timeline/history/ + // constructStorageUri uses directoryUri + filename directly for non-properties + fileUri = S3_TABLE_URI + ".hoodie/" + fileName; + } + when(presignedUrlFileUploader.uploadFileToPresignedUrl( + presignedUrl, fileUri, metadataExtractorConfig.getFileUploadStreamBatchSize())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + when(onehouseApiClient.upsertTableMetricsCheckpoint( + UpsertTableMetricsCheckpointRequest.builder() + .commitTimelineType(commitTimelineType) + .tableId(TABLE_ID.toString()) + .checkpoint(mapper.writeValueAsString(updatedCheckpoint)) + .filesUploaded(filesUploadedWithUpdatedName) + .uploadedFiles(filesUploaded) + .build())) + .thenReturn( + CompletableFuture.completedFuture( + UpsertTableMetricsCheckpointResponse.builder().build())); + } + + private String addPrefixToFileNameV2( + String fileName, CommitTimelineType commitTimelineType) { + if (HOODIE_PROPERTIES_FILE.equals(fileName)) { + return fileName; + } + if (CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType)) { + return TIMELINE_HISTORY_PREFIX + fileName; + } + return TIMELINE_PREFIX + fileName; + } + private void stubCreateBatches( List files, List> expectedBatches, From 4f284aa596a52f48357440e283028afe1dbe5ad3 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Thu, 2 Apr 2026 13:47:34 -0700 Subject: [PATCH 02/10] [ENG-38901] Replace string concatenation with text block in HoodiePropertiesReaderTest Address SonarCloud issue: use Java text block instead of string concatenation for test properties content. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../metadata_extractor/HoodiePropertiesReaderTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java index 70cad0c0..abe9c2a4 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java @@ -57,10 +57,11 @@ void testReadHoodieProperties(TableType tableType) void testReadHoodiePropertiesV9() throws ExecutionException, InterruptedException { String path = "some/path/to/properties/file"; String propertiesContent = - "hoodie.table.name=test_v9_table\n" - + "hoodie.table.type=MERGE_ON_READ\n" - + "hoodie.table.version=9\n" - + "hoodie.timeline.layout.version=2"; + """ + hoodie.table.name=test_v9_table + hoodie.table.type=MERGE_ON_READ + hoodie.table.version=9 + hoodie.timeline.layout.version=2"""; ByteArrayInputStream inputStream = new ByteArrayInputStream(propertiesContent.getBytes()); when(asyncStorageClient.streamFileAsync(path)) From f7d520bff7564aa1a53d43db0a253efd24cf0ab0 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Thu, 2 Apr 2026 13:49:50 -0700 Subject: [PATCH 03/10] [ENG-38901] Revert text block change - CI uses Java 11 Text blocks require Java 15+. The CI build compiles with Java 11, so reverting to string concatenation. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../metadata_extractor/HoodiePropertiesReaderTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java index abe9c2a4..70cad0c0 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/HoodiePropertiesReaderTest.java @@ -57,11 +57,10 @@ void testReadHoodieProperties(TableType tableType) void testReadHoodiePropertiesV9() throws ExecutionException, InterruptedException { String path = "some/path/to/properties/file"; String propertiesContent = - """ - hoodie.table.name=test_v9_table - hoodie.table.type=MERGE_ON_READ - hoodie.table.version=9 - hoodie.timeline.layout.version=2"""; + "hoodie.table.name=test_v9_table\n" + + "hoodie.table.type=MERGE_ON_READ\n" + + "hoodie.table.version=9\n" + + "hoodie.timeline.layout.version=2"; ByteArrayInputStream inputStream = new ByteArrayInputStream(propertiesContent.getBytes()); when(asyncStorageClient.streamFileAsync(path)) From 665b6ab6c27d5004fd0be2b7374ded6ff592165e Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Thu, 2 Apr 2026 14:29:36 -0700 Subject: [PATCH 04/10] [ENG-38901] Address code review findings for Hudi V9 timeline support - Fix V2 archived parquet comparison to use full filename when numeric keys match, preventing files with same timestamp but different sequence numbers from being skipped on restart - Add error handling for readHoodieProperties in existing-table path to avoid silently falling back to V1 defaults on failure - Cache ParsedHudiProperties to avoid re-reading on every sync cycle - Extract TIMELINE_LAYOUT_VERSION_V2 constant replacing magic number 2 - Rename V2_PARQUET_NUMERIC_PATTERN to V2_ARCHIVED_PARQUET_TIMESTAMP_PATTERN - Use consistent char separator style in path construction - Remove redundant if/else branch in stubUploadInstantsCallsV2 test Co-Authored-By: Claude Opus 4.6 (1M context) --- .../constants/MetadataExtractorConstants.java | 5 +- .../TableMetadataUploaderService.java | 66 +++++++++++++------ .../TimelineCommitInstantsUploader.java | 31 +++++---- .../TimelineCommitInstantsUploaderTest.java | 14 +--- 4 files changed, 73 insertions(+), 43 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java index 44277529..0730b270 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java @@ -22,6 +22,7 @@ private MetadataExtractorConstants() {} "hoodie.timeline.layout.version"; public static final String TIMELINE_FOLDER_NAME = "timeline"; public static final String HISTORY_FOLDER_NAME = "history"; + public static final int TIMELINE_LAYOUT_VERSION_V2 = 2; // The default number of instants in one archived commit metadata file is 10 // so we want to ingest 10x active instants than archived instants in one batch @@ -53,8 +54,8 @@ private MetadataExtractorConstants() {} Pattern.compile("\\d+(_\\d+)?(\\.[a-z]{1,20}){1,2}"); public static final Pattern V1_ARCHIVED_NUMERIC_PATTERN = Pattern.compile("\\.archive\\.(\\d+)_"); - public static final Pattern V2_PARQUET_NUMERIC_PATTERN = - Pattern.compile("^(\\d+)_\\d+_\\d+\\.parquet$"); + public static final Pattern V2_ARCHIVED_PARQUET_TIMESTAMP_PATTERN = + Pattern.compile("^(\\d+)_(\\d+)_(\\d+)\\.parquet$"); public static final Pattern V2_MANIFEST_NUMERIC_PATTERN = Pattern.compile("^manifest_(\\d+)$"); public static final Checkpoint INITIAL_CHECKPOINT = diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java index 41da2e26..0ad92015 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java @@ -52,6 +52,7 @@ public class TableMetadataUploaderService { private final LakeViewExtractorMetrics hudiMetadataExtractorMetrics; private final ExecutorService executorService; private final ObjectMapper mapper; + private final Map propertiesCache = new ConcurrentHashMap<>(); @Inject public TableMetadataUploaderService( @@ -146,25 +147,51 @@ private CompletableFuture uploadInstantsInTableBatch(List
tables StringUtils.isNotBlank(checkpointString) ? mapper.readValue(checkpointString, Checkpoint.class) : INITIAL_CHECKPOINT; - processTablesFuture.add( - hoodiePropertiesReader - .readHoodieProperties(getHoodiePropertiesFilePath(table)) - .thenComposeAsync( - properties -> { - Table tableWithVersion = table; - if (properties != null) { - tableWithVersion = table.toBuilder() - .tableVersion(properties.getTableVersion()) - .timelineLayoutVersion( - properties.getTimelineLayoutVersion()) - .build(); - } - return uploadNewInstantsSinceCheckpoint( - tableWithVersion.getTableId(), - tableWithVersion, - checkpoint); - }, - executorService)); + ParsedHudiProperties cachedProperties = + propertiesCache.get(table.getTableId()); + if (cachedProperties != null) { + Table tableWithVersion = table.toBuilder() + .tableVersion(cachedProperties.getTableVersion()) + .timelineLayoutVersion( + cachedProperties.getTimelineLayoutVersion()) + .build(); + processTablesFuture.add( + uploadNewInstantsSinceCheckpoint( + tableWithVersion.getTableId(), + tableWithVersion, + checkpoint)); + } else { + processTablesFuture.add( + hoodiePropertiesReader + .readHoodieProperties(getHoodiePropertiesFilePath(table)) + .thenComposeAsync( + properties -> { + Table tableWithVersion = table; + if (properties != null + && properties.getMetadataUploadFailureReasons() + == null) { + propertiesCache.put( + table.getTableId(), properties); + tableWithVersion = table.toBuilder() + .tableVersion( + properties.getTableVersion()) + .timelineLayoutVersion( + properties.getTimelineLayoutVersion()) + .build(); + } else { + log.warn( + "Failed to read hoodie.properties for " + + "table: {}, using default version " + + "settings", + table); + } + return uploadNewInstantsSinceCheckpoint( + tableWithVersion.getTableId(), + tableWithVersion, + checkpoint); + }, + executorService)); + } } catch (JsonProcessingException e) { log.error( "Error deserializing checkpoint value for table: {}, skipping table", @@ -253,6 +280,7 @@ private int getNumberOfMissingTables(List 1, parquet_1 -> 20260130205837315, parquet_2 -> 20260130205837315, - // _version_ -> Long.MAX_VALUE + // manifest_1 -> 1, parquet_1 -> 20260130205837315, parquet_2 -> 20260130205837315 + // (same timestamp, differentiated by full filename comparison), _version_ -> Long.MAX_VALUE // With batchId=0, hoodie.properties is prepended Checkpoint checkpoint0 = generateCheckpointObj(1, Instant.EPOCH, false, HOODIE_PROPERTIES_FILE); Checkpoint checkpoint1 = @@ -1728,15 +1728,7 @@ private void stubUploadInstantsCallsV2( .build())); for (String presignedUrl : presignedUrls) { String fileName = presignedUrl.substring(PRESIGNED_URL_PREFIX.length()); - // V2: constructStorageUri for hoodie.properties strips timeline/ path - String fileUri; - if (HOODIE_PROPERTIES_FILE.equals(fileName)) { - fileUri = S3_TABLE_URI + ".hoodie/" + fileName; - } else { - // For V2, the directory is already timeline/ or timeline/history/ - // constructStorageUri uses directoryUri + filename directly for non-properties - fileUri = S3_TABLE_URI + ".hoodie/" + fileName; - } + String fileUri = S3_TABLE_URI + ".hoodie/" + fileName; when(presignedUrlFileUploader.uploadFileToPresignedUrl( presignedUrl, fileUri, metadataExtractorConfig.getFileUploadStreamBatchSize())) .thenReturn(CompletableFuture.completedFuture(null)); From 28bbe922a0955a74a9faa4e7e6c3d51c0049e2c4 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Thu, 9 Apr 2026 22:28:06 -0700 Subject: [PATCH 05/10] [ENG-38901] Address code review findings round 2 - Add comment explaining why propertiesCache has no TTL - Extract hasActionType() helper to consolidate isSavepointCommit/isRollbackCommit - Fix V9 test filenames: pending instants use single-timestamp format Co-Authored-By: Claude Sonnet 4.6 --- .../TableMetadataUploaderService.java | 2 ++ .../TimelineCommitInstantsUploader.java | 15 +++++------- .../TimelineCommitInstantsUploaderTest.java | 24 +++++++++---------- 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java index 0ad92015..6c36b1be 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java @@ -52,6 +52,8 @@ public class TableMetadataUploaderService { private final LakeViewExtractorMetrics hudiMetadataExtractorMetrics; private final ExecutorService executorService; private final ObjectMapper mapper; + // Cache stores a few lightweight entries (one per table). No TTL needed since table versions + // do not change at runtime, and LakeView restarts on upgrades. private final Map propertiesCache = new ConcurrentHashMap<>(); @Inject diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java index a021b652..7a1063d7 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java @@ -783,19 +783,16 @@ public File getLastUploadedFileFromBatch( } private boolean isSavepointCommit(File file) { - String[] parts = file.getFilename().split("\\.", 3); - if (parts.length < 2) { - return false; - } - return SAVEPOINT_ACTION.equals(parts[1]); + return hasActionType(file, SAVEPOINT_ACTION); } private boolean isRollbackCommit(File file) { + return hasActionType(file, ROLLBACK_ACTION); + } + + private boolean hasActionType(File file, String actionType) { String[] parts = file.getFilename().split("\\.", 3); - if (parts.length < 2) { - return false; - } - return ROLLBACK_ACTION.equals(parts[1]); + return parts.length >= 2 && actionType.equals(parts[1]); } @VisibleForTesting diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java index 5d1da754..6e66027d 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java @@ -1586,8 +1586,8 @@ void testUploadInstantsInActiveTimelineV2() { Arrays.asList( generateFileObj("should_be_ignored", false), generateFileObj("20260130205837315_20260201000250371.commit", false), - generateFileObj("20260130205837315_20260201000250371.inflight", false), - generateFileObj("20260130205837315_20260201000250371.commit.requested", false), + generateFileObj("20260130205837315.commit.inflight", false), + generateFileObj("20260130205837315.commit.requested", false), generateFileObj( "20260201000250371_20260202000350471.commit", false, currentTime))); // Page 2 (last page) @@ -1600,8 +1600,8 @@ void testUploadInstantsInActiveTimelineV2() { Arrays.asList( generateFileObj( "20260201000250371_20260202000350471.commit", false, currentTime), - generateFileObj("20260201000250371_20260202000350471.inflight", false), - generateFileObj("20260201000250371_20260202000350471.commit.requested", false), + generateFileObj("20260201000250371.commit.inflight", false), + generateFileObj("20260201000250371.commit.requested", false), generateFileObj(HOODIE_PROPERTIES_FILE, false))); List batch1 = @@ -1609,18 +1609,18 @@ void testUploadInstantsInActiveTimelineV2() { generateFileObj( "20260130205837315_20260201000250371.commit", false), generateFileObj( - "20260130205837315_20260201000250371.inflight", false), + "20260130205837315.commit.inflight", false), generateFileObj( - "20260130205837315_20260201000250371.commit.requested", false)); + "20260130205837315.commit.requested", false)); List batch2 = Arrays.asList( generateFileObj( "20260201000250371_20260202000350471.commit", false, currentTime), generateFileObj( - "20260201000250371_20260202000350471.inflight", false), + "20260201000250371.commit.inflight", false), generateFileObj( - "20260201000250371_20260202000350471.commit.requested", false)); + "20260201000250371.commit.requested", false)); Checkpoint checkpoint1 = generateCheckpointObj( @@ -1640,9 +1640,9 @@ void testUploadInstantsInActiveTimelineV2() { generateFileObj( "20260130205837315_20260201000250371.commit", false), generateFileObj( - "20260130205837315_20260201000250371.inflight", false), + "20260130205837315.commit.inflight", false), generateFileObj( - "20260130205837315_20260201000250371.commit.requested", false), + "20260130205837315.commit.requested", false), generateFileObj( "20260201000250371_20260202000350471.commit", false, @@ -1657,9 +1657,9 @@ void testUploadInstantsInActiveTimelineV2() { generateFileObj( "20260201000250371_20260202000350471.commit", false, currentTime), generateFileObj( - "20260201000250371_20260202000350471.inflight", false), + "20260201000250371.commit.inflight", false), generateFileObj( - "20260201000250371_20260202000350471.commit.requested", false)), + "20260201000250371.commit.requested", false)), Collections.singletonList(batch2), checkpoint1, checkpoint1.getFirstIncompleteCommitFile()); From b174725afe6ebfca44c9d033a5f1a78a5b47b156 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Wed, 15 Apr 2026 09:57:58 -0700 Subject: [PATCH 06/10] [ENG-38901] Redesign V2 archived timeline upload to be manifest-driven Rewrites the V2 archived path so LakeView reads _version_ and manifest_N and mirrors files by diffing successive manifest versions, making the path compaction-safe. Checkpoints advance by manifest version instead of filename, so subsequent syncs correctly pick up new archivals after Hudi's LSM compaction rewrites the file set. Also addresses round 3 review feedback from nsivabalan: - Extract hoodie table/timeline version defaults to constants - Dedupe concurrent hoodie.properties reads via computeIfAbsent - Skip table instead of silently falling back to v6/v1 on read failure - Add optional completionTime field to ActiveTimelineInstant for v9 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../constants/MetadataExtractorConstants.java | 9 +- .../ActiveTimelineInstantBatcher.java | 20 +- .../HoodiePropertiesReader.java | 9 +- .../LSMTimelineManifestReader.java | 148 ++++++++++ .../TableMetadataUploaderService.java | 104 ++++--- .../TimelineCommitInstantsUploader.java | 277 +++++++++++++++--- .../metadata_extractor/models/Checkpoint.java | 5 + .../models/ParsedHudiProperties.java | 7 +- .../metadata_extractor/models/Table.java | 7 +- ...ontinueOnIncompleteCommitStrategyTest.java | 4 +- .../TimelineCommitInstantsUploaderTest.java | 261 +++++++++++++---- 11 files changed, 698 insertions(+), 153 deletions(-) create mode 100644 lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java diff --git a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java index 0730b270..4877f87e 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java @@ -22,7 +22,11 @@ private MetadataExtractorConstants() {} "hoodie.timeline.layout.version"; public static final String TIMELINE_FOLDER_NAME = "timeline"; public static final String HISTORY_FOLDER_NAME = "history"; + public static final int TIMELINE_LAYOUT_VERSION_V1 = 1; public static final int TIMELINE_LAYOUT_VERSION_V2 = 2; + public static final int HOODIE_TABLE_VERSION_DEFAULT = 6; + public static final int TIMELINE_LAYOUT_VERSION_DEFAULT = TIMELINE_LAYOUT_VERSION_V1; + public static final String MANIFEST_FILE_PREFIX = "manifest_"; // The default number of instants in one archived commit metadata file is 10 // so we want to ingest 10x active instants than archived instants in one batch @@ -54,10 +58,6 @@ private MetadataExtractorConstants() {} Pattern.compile("\\d+(_\\d+)?(\\.[a-z]{1,20}){1,2}"); public static final Pattern V1_ARCHIVED_NUMERIC_PATTERN = Pattern.compile("\\.archive\\.(\\d+)_"); - public static final Pattern V2_ARCHIVED_PARQUET_TIMESTAMP_PATTERN = - Pattern.compile("^(\\d+)_(\\d+)_(\\d+)\\.parquet$"); - public static final Pattern V2_MANIFEST_NUMERIC_PATTERN = - Pattern.compile("^manifest_(\\d+)$"); public static final Checkpoint INITIAL_CHECKPOINT = Checkpoint.builder() .batchId(0) @@ -65,6 +65,7 @@ private MetadataExtractorConstants() {} .lastUploadedFile("") .firstIncompleteCommitFile("") .archivedCommitsProcessed(false) + .lastArchivedManifestVersion(0) .build(); // hardcoding last modified at to prevent this from causing issues with our checkpoint logic diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java index 0efafd1e..41b8e4ec 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/ActiveTimelineInstantBatcher.java @@ -267,11 +267,15 @@ static boolean areRelatedSavepointOrRollbackInstants( static ActiveTimelineInstant getActiveTimeLineInstant(String instant) { String[] parts = instant.split("\\.", 3); - // Strip completion timestamp from V9 completed instants - // e.g., "20260204053206256_20260204053210895" -> "20260204053206256" + // V9 completed instants embed completion time after an underscore in the leading token, + // e.g. "20260204053206256_20260204053210895". Split it out so callers see the request + // timestamp and the optional completion timestamp separately. String timestamp = parts[0]; + String completionTime = null; if (timestamp.contains("_")) { - timestamp = timestamp.split("_")[0]; + String[] tsParts = timestamp.split("_", 2); + timestamp = tsParts[0]; + completionTime = tsParts[1]; } String action; @@ -284,13 +288,21 @@ static ActiveTimelineInstant getActiveTimeLineInstant(String instant) { action = parts[1]; state = parts.length == 3 ? parts[2] : "completed"; } - return ActiveTimelineInstant.builder().timestamp(timestamp).action(action).state(state).build(); + return ActiveTimelineInstant.builder() + .timestamp(timestamp) + .completionTime(completionTime) + .action(action) + .state(state) + .build(); } @Builder @Getter static class ActiveTimelineInstant { private final String timestamp; + // Only populated for V9 (table version >= 8) completed instants, which embed completion time + // alongside the request timestamp in the filename. Null for V1-V8 instants. + private final String completionTime; private final String action; private final String state; } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java index 88a15fc8..c367de11 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HoodiePropertiesReader.java @@ -2,8 +2,10 @@ import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_NAME_KEY; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_TYPE_KEY; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_KEY; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TIMELINE_LAYOUT_VERSION_KEY; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT; import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; import com.google.inject.Inject; @@ -45,9 +47,12 @@ public CompletableFuture readHoodieProperties(String path) throw new RuntimeException("Failed to load properties file", e); } int tableVersion = Integer.parseInt( - properties.getProperty(HOODIE_TABLE_VERSION_KEY, "6")); + properties.getProperty( + HOODIE_TABLE_VERSION_KEY, String.valueOf(HOODIE_TABLE_VERSION_DEFAULT))); int timelineLayoutVersion = Integer.parseInt( - properties.getProperty(HOODIE_TIMELINE_LAYOUT_VERSION_KEY, "1")); + properties.getProperty( + HOODIE_TIMELINE_LAYOUT_VERSION_KEY, + String.valueOf(TIMELINE_LAYOUT_VERSION_DEFAULT))); return ParsedHudiProperties.builder() .tableName(properties.getProperty(HOODIE_TABLE_NAME_KEY)) .tableType(TableType.valueOf(properties.getProperty(HOODIE_TABLE_TYPE_KEY))) diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java new file mode 100644 index 00000000..1eb1aadb --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java @@ -0,0 +1,148 @@ +package ai.onehouse.metadata_extractor; + +import static ai.onehouse.constants.MetadataExtractorConstants.MANIFEST_FILE_PREFIX; +import static ai.onehouse.constants.MetadataExtractorConstants.VERSION_MARKER_FILE; + +import ai.onehouse.RuntimeModule.TableMetadataUploadObjectStorageAsyncClient; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.StorageUtils; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; + +/** + * Reads the Hudi LSM (V2) archived timeline manifest layout that lives under {@code + * .hoodie/timeline/history/}. The layout, written by Hudi 1.x, is: + * + *
+ *   _version_                                # text file containing the latest manifest version
+ *   manifest_N                               # JSON: { "files": [{ "fileName": "...", "fileLen": N }] }
+ *   {minInstant}_{maxInstant}_{level}.parquet
+ * 
+ * + *

The manifest is the source of truth for which parquet files are valid in the current + * snapshot. Compaction rewrites multiple low-level files into a single higher-level file, leaving + * orphaned data files on storage; only files referenced by the latest manifest should be treated + * as live. LakeView mirrors files to the backend rather than parsing them, so it only needs the + * filenames the manifest lists - no Hudi dependency required. + */ +@Slf4j +public class LSMTimelineManifestReader { + private final AsyncStorageClient asyncStorageClient; + private final StorageUtils storageUtils; + private final ObjectMapper mapper = new ObjectMapper(); + + @Inject + public LSMTimelineManifestReader( + @Nonnull @TableMetadataUploadObjectStorageAsyncClient AsyncStorageClient asyncStorageClient, + @Nonnull StorageUtils storageUtils) { + this.asyncStorageClient = asyncStorageClient; + this.storageUtils = storageUtils; + } + + /** + * Reads {@code _version_} and the corresponding {@code manifest_N} from the given history + * directory. Returns {@link ManifestSnapshot#empty()} when the {@code _version_} file does not + * exist (no archives have been written yet). + */ + public CompletableFuture readLatestManifest(String historyDirectoryUri) { + String versionFileUri = storageUtils.constructFileUri(historyDirectoryUri, VERSION_MARKER_FILE); + return asyncStorageClient + .readFileAsBytes(versionFileUri) + .thenCompose( + versionBytes -> { + int version = parseVersionFile(versionBytes); + return readManifestForVersion(historyDirectoryUri, version) + .thenApply(files -> ManifestSnapshot.of(version, files)); + }) + .exceptionally( + throwable -> { + log.info( + "No V2 archived timeline _version_ file at {} (treating as empty). Reason: {}", + versionFileUri, + throwable.getMessage()); + return ManifestSnapshot.empty(); + }); + } + + /** + * Reads {@code manifest_N} for the given version and returns the parquet filenames it + * references. Returns an empty list if the manifest cannot be read - callers treat this as + * "previous snapshot is gone, fall back to bootstrap" rather than a hard failure. + */ + public CompletableFuture> readManifestFileNames( + String historyDirectoryUri, int version) { + return readManifestForVersion(historyDirectoryUri, version) + .exceptionally( + throwable -> { + log.warn( + "Failed to read manifest_{} from {}; treating as missing. Reason: {}", + version, + historyDirectoryUri, + throwable.getMessage()); + return Collections.emptyList(); + }); + } + + private CompletableFuture> readManifestForVersion( + String historyDirectoryUri, int version) { + String manifestUri = + storageUtils.constructFileUri(historyDirectoryUri, MANIFEST_FILE_PREFIX + version); + return asyncStorageClient + .readFileAsBytes(manifestUri) + .thenApply( + bytes -> { + try { + return parseManifestFileNames(bytes); + } catch (IOException e) { + throw new UncheckedIOException( + "Failed to parse LSM manifest at " + manifestUri, e); + } + }); + } + + private static int parseVersionFile(byte[] bytes) { + String contents = new String(bytes, StandardCharsets.UTF_8).trim(); + return Integer.parseInt(contents); + } + + private List parseManifestFileNames(byte[] bytes) throws IOException { + JsonNode root = mapper.readTree(bytes); + JsonNode files = root.get("files"); + List result = new ArrayList<>(); + if (files != null && files.isArray()) { + for (JsonNode entry : files) { + JsonNode name = entry.get("fileName"); + if (name != null && !name.asText().isEmpty()) { + result.add(name.asText()); + } + } + } + return result; + } + + /** Snapshot of the latest LSM manifest: version number plus the parquet filenames it lists. */ + @Value(staticConstructor = "of") + public static class ManifestSnapshot { + int version; + @Nonnull List parquetFileNames; + + public static ManifestSnapshot empty() { + return ManifestSnapshot.of(0, Collections.emptyList()); + } + + public boolean isEmpty() { + return version == 0; + } + } +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java index 6c36b1be..e66d4629 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableMetadataUploaderService.java @@ -53,8 +53,11 @@ public class TableMetadataUploaderService { private final ExecutorService executorService; private final ObjectMapper mapper; // Cache stores a few lightweight entries (one per table). No TTL needed since table versions - // do not change at runtime, and LakeView restarts on upgrades. - private final Map propertiesCache = new ConcurrentHashMap<>(); + // do not change at runtime, and LakeView restarts on upgrades. Stores the in-flight future + // so concurrent table-batch processing dedupes reads (computeIfAbsent semantics) instead of + // each thread issuing its own hoodie.properties fetch. + private final Map> propertiesCache = + new ConcurrentHashMap<>(); @Inject public TableMetadataUploaderService( @@ -149,51 +152,32 @@ private CompletableFuture uploadInstantsInTableBatch(List

tables StringUtils.isNotBlank(checkpointString) ? mapper.readValue(checkpointString, Checkpoint.class) : INITIAL_CHECKPOINT; - ParsedHudiProperties cachedProperties = - propertiesCache.get(table.getTableId()); - if (cachedProperties != null) { - Table tableWithVersion = table.toBuilder() - .tableVersion(cachedProperties.getTableVersion()) - .timelineLayoutVersion( - cachedProperties.getTimelineLayoutVersion()) - .build(); - processTablesFuture.add( - uploadNewInstantsSinceCheckpoint( - tableWithVersion.getTableId(), - tableWithVersion, - checkpoint)); - } else { - processTablesFuture.add( - hoodiePropertiesReader - .readHoodieProperties(getHoodiePropertiesFilePath(table)) - .thenComposeAsync( - properties -> { - Table tableWithVersion = table; - if (properties != null - && properties.getMetadataUploadFailureReasons() - == null) { - propertiesCache.put( - table.getTableId(), properties); - tableWithVersion = table.toBuilder() - .tableVersion( - properties.getTableVersion()) + processTablesFuture.add( + getOrLoadProperties(table) + .thenComposeAsync( + properties -> { + if (properties.getMetadataUploadFailureReasons() != null) { + log.error( + "Failed to read hoodie.properties for table: {} " + + "reason: {}. Skipping table to avoid " + + "mis-detecting the timeline layout version.", + table, + properties.getMetadataUploadFailureReasons()); + return CompletableFuture.completedFuture(false); + } + Table tableWithVersion = + table + .toBuilder() + .tableVersion(properties.getTableVersion()) .timelineLayoutVersion( properties.getTimelineLayoutVersion()) .build(); - } else { - log.warn( - "Failed to read hoodie.properties for " - + "table: {}, using default version " - + "settings", - table); - } - return uploadNewInstantsSinceCheckpoint( - tableWithVersion.getTableId(), - tableWithVersion, - checkpoint); - }, - executorService)); - } + return uploadNewInstantsSinceCheckpoint( + tableWithVersion.getTableId(), + tableWithVersion, + checkpoint); + }, + executorService)); } catch (JsonProcessingException e) { log.error( "Error deserializing checkpoint value for table: {}, skipping table", @@ -258,8 +242,7 @@ private int getNumberOfMissingTables(List(); for (Table table : tablesToInitialise) { initializeSingleTableMetricsCheckpointRequestFutureList.add( - hoodiePropertiesReader - .readHoodieProperties(getHoodiePropertiesFilePath(table)) + getOrLoadProperties(table) .thenApply( properties -> { if (properties == null || properties.getMetadataUploadFailureReasons() != null) { @@ -282,7 +265,6 @@ private int getNumberOfMissingTables(List uploadNewInstantsSinceCheckpoint( .thenApply(Objects::nonNull); } + /** + * Returns a cached or in-flight {@link ParsedHudiProperties} future for the table. Concurrent + * callers see the same future instead of issuing duplicate hoodie.properties reads. If the + * underlying read fails (parsed properties carry a non-null failure reason), the cache entry is + * evicted so the next sync attempt retries from scratch. + * + *

Eviction is dispatched via {@code whenCompleteAsync} on {@link #executorService} so the + * removal never runs on the same call stack as {@link java.util.concurrent.ConcurrentHashMap + * #computeIfAbsent} - mutating the map from inside its own mapping function would trip the + * "Recursive update" guard. + */ + private CompletableFuture getOrLoadProperties(Table table) { + return propertiesCache.computeIfAbsent( + table.getTableId(), + id -> + hoodiePropertiesReader + .readHoodieProperties(getHoodiePropertiesFilePath(table)) + .whenCompleteAsync( + (result, throwable) -> { + if (throwable != null + || result == null + || result.getMetadataUploadFailureReasons() != null) { + propertiesCache.remove(id); + } + }, + executorService)); + } + private String getHoodiePropertiesFilePath(Table table) { String basePath = table.getAbsoluteTableUri(); return String.format( diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java index 7a1063d7..d1cc4366 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java @@ -8,13 +8,12 @@ import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE; import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE_OBJ; +import static ai.onehouse.constants.MetadataExtractorConstants.MANIFEST_FILE_PREFIX; import static ai.onehouse.constants.MetadataExtractorConstants.ROLLBACK_ACTION; import static ai.onehouse.constants.MetadataExtractorConstants.SAVEPOINT_ACTION; import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_FOLDER_NAME; -import static ai.onehouse.constants.MetadataExtractorConstants.V1_ARCHIVED_NUMERIC_PATTERN; -import static ai.onehouse.constants.MetadataExtractorConstants.V2_MANIFEST_NUMERIC_PATTERN; import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_V2; -import static ai.onehouse.constants.MetadataExtractorConstants.V2_ARCHIVED_PARQUET_TIMESTAMP_PATTERN; +import static ai.onehouse.constants.MetadataExtractorConstants.V1_ARCHIVED_NUMERIC_PATTERN; import static ai.onehouse.constants.MetadataExtractorConstants.VERSION_MARKER_FILE; import static ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher.areRelatedInstants; import static ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher.areRelatedSavepointOrRollbackInstants; @@ -45,9 +44,12 @@ import ai.onehouse.RuntimeModule.TableMetadataUploadObjectStorageAsyncClient; import java.math.BigDecimal; +import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.regex.Matcher; @@ -72,6 +74,7 @@ public class TimelineCommitInstantsUploader { private final ActiveTimelineInstantBatcher activeTimelineInstantBatcher; private final LakeViewExtractorMetrics hudiMetadataExtractorMetrics; private final MetadataExtractorConfig extractorConfig; + private final LSMTimelineManifestReader lsmTimelineManifestReader; @Inject public TimelineCommitInstantsUploader( @@ -82,7 +85,8 @@ public TimelineCommitInstantsUploader( @Nonnull ExecutorService executorService, @Nonnull ActiveTimelineInstantBatcher activeTimelineInstantBatcher, @Nonnull LakeViewExtractorMetrics hudiMetadataExtractorMetrics, - @Nonnull Config config) { + @Nonnull Config config, + @Nonnull LSMTimelineManifestReader lsmTimelineManifestReader) { this.asyncStorageClient = asyncStorageClient; this.presignedUrlFileUploader = presignedUrlFileUploader; this.onehouseApiClient = onehouseApiClient; @@ -91,6 +95,7 @@ public TimelineCommitInstantsUploader( this.activeTimelineInstantBatcher = activeTimelineInstantBatcher; this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; this.extractorConfig = config.getMetadataExtractorConfig(); + this.lsmTimelineManifestReader = lsmTimelineManifestReader; this.mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); } @@ -118,6 +123,15 @@ public CompletableFuture batchUploadWithCheckpoint( table.getAbsoluteTableUri(), getPathSuffixForTimeline(commitTimelineType, table.getTimelineLayoutVersion())); + if (CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED.equals(commitTimelineType) + && table.getTimelineLayoutVersion() == TIMELINE_LAYOUT_VERSION_V2) { + // V2 (Hudi 1.x) archived timeline lives under .hoodie/timeline/history/ as an LSM tree. + // We mirror it manifest-by-manifest rather than listing files: the manifest is the only + // source of truth for which parquet files are live, and filename-based checkpointing is + // unsafe because compaction rewrites the file set between runs. + return executeManifestDrivenArchivedUpload(tableId, table, timelineUri, checkpoint); + } + return executeFullBatchUpload(tableId, table, timelineUri, checkpoint, commitTimelineType); } @@ -198,6 +212,229 @@ private CompletableFuture executeFullBatchUpload( }); } + /** + * V2 archived upload, manifest-driven. + * + *

Reads {@code _version_} and the latest {@code manifest_N} from {@code + * .hoodie/timeline/history/}. Uploads only parquet files newly referenced by the current + * manifest (diffed against the previous manifest when available, otherwise bootstrapped from + * scratch), then uploads the new manifest, then the {@code _version_} marker last so the + * backend never sees a manifest pointing at files that have not arrived yet. Checkpoints by + * manifest version, not filename, which makes the path compaction-safe: compaction merges L0 + * files into a single higher-level file, but the new manifest references the merged file and + * the backend (which reads via {@code TimelineFactory.createArchivedTimeline}, see + * gateway-controller PR 8797) honours the manifest, so any orphaned mirror copies are inert. + */ + private CompletableFuture executeManifestDrivenArchivedUpload( + String tableId, Table table, String historyUri, Checkpoint checkpoint) { + return lsmTimelineManifestReader + .readLatestManifest(historyUri) + .thenComposeAsync( + currentSnapshot -> { + if (currentSnapshot.isEmpty()) { + log.info( + "No V2 archived timeline yet for table {} (no _version_ file). Skipping.", + table); + return CompletableFuture.completedFuture( + checkpoint.toBuilder().archivedCommitsProcessed(true).build()); + } + + int previousVersion = checkpoint.getLastArchivedManifestVersion(); + int currentVersion = currentSnapshot.getVersion(); + if (currentVersion == previousVersion && checkpoint.isArchivedCommitsProcessed()) { + log.info( + "V2 archived timeline already at manifest_{} for table {}; nothing to do.", + currentVersion, + table); + return CompletableFuture.completedFuture(checkpoint); + } + + CompletableFuture> previouslyMirroredFuture = + previousVersion > 0 + ? lsmTimelineManifestReader + .readManifestFileNames(historyUri, previousVersion) + .thenApply(HashSet::new) + : CompletableFuture.completedFuture(new HashSet<>()); + + return previouslyMirroredFuture.thenComposeAsync( + previouslyMirrored -> { + List filesToUpload = new ArrayList<>(); + if (checkpoint.getBatchId() == 0) { + filesToUpload.add(HOODIE_PROPERTIES_FILE_OBJ); + } + int newParquetCount = 0; + for (String parquet : currentSnapshot.getParquetFileNames()) { + if (!previouslyMirrored.contains(parquet)) { + filesToUpload.add(buildArchivedFile(parquet)); + newParquetCount++; + } + } + // Upload order: parquets -> manifest -> _version_. The manifest must arrive + // after every parquet it references, and _version_ must be the very last + // write so a partial run never advertises an inconsistent snapshot to the + // backend. + filesToUpload.add(buildArchivedFile(MANIFEST_FILE_PREFIX + currentVersion)); + filesToUpload.add(buildArchivedFile(VERSION_MARKER_FILE)); + + log.info( + "V2 archived: mirroring {} new parquet(s) plus manifest_{} for table {}" + + " (previous mirrored manifest version: {})", + newParquetCount, + currentVersion, + table, + previousVersion); + + return uploadV2ArchivedFilesInBatches( + tableId, table, filesToUpload, checkpoint, currentVersion); + }, + executorService); + }, + executorService) + .exceptionally( + throwable -> { + log.error( + "Encountered exception when uploading V2 archived timeline for table {}", + table, + throwable); + hudiMetadataExtractorMetrics.incrementTableMetadataProcessingFailureCounter( + getMetadataExtractorFailureReason( + throwable, + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN), + String.format( + "Exception when uploading V2 archived timeline for table %s: %s", + table, throwable.getMessage())); + return null; + }); + } + + private static File buildArchivedFile(String filename) { + return File.builder() + .filename(filename) + .isDirectory(false) + .lastModifiedAt(Instant.EPOCH) + .build(); + } + + /** + * Uploads the V2 archived file list in sequential batches and writes a single checkpoint after + * the final batch succeeds. The checkpoint advances {@code lastArchivedManifestVersion} so the + * next run knows which manifest to diff against. + */ + private CompletableFuture uploadV2ArchivedFilesInBatches( + String tableId, + Table table, + List filesToUpload, + Checkpoint previousCheckpoint, + int newManifestVersion) { + int batchSize = getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + List> batches = Lists.partition(filesToUpload, batchSize); + String directoryUri = + storageUtils.constructFileUri( + table.getAbsoluteTableUri(), + getPathSuffixForTimeline( + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED, table.getTimelineLayoutVersion())); + + CompletableFuture> sequentialUpload = + CompletableFuture.completedFuture(new ArrayList<>()); + for (List batch : batches) { + sequentialUpload = + sequentialUpload.thenComposeAsync( + accumulated -> { + if (accumulated == null) { + return CompletableFuture.completedFuture(null); + } + return uploadBatch( + tableId, + batch, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED, + directoryUri, + table.getTimelineLayoutVersion()) + .thenApply( + ignored -> { + for (File f : batch) { + accumulated.add( + UploadedFile.builder() + .name( + getFileNameWithPrefix( + f, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED, + table.getTimelineLayoutVersion())) + .lastModifiedAt(f.getLastModifiedAt().toEpochMilli()) + .build()); + } + return accumulated; + }) + .exceptionally( + throwable -> { + hudiMetadataExtractorMetrics + .incrementTableMetadataProcessingFailureCounter( + getMetadataExtractorFailureReason( + throwable, + MetricsConstants.MetadataUploadFailureReasons.UNKNOWN), + String.format( + "V2 archived batch upload failed for table %s: %s", + table.getAbsoluteTableUri(), throwable.getMessage())); + log.error( + "V2 archived batch upload failed for table: {}. Skipping further" + + " batches in this run.", + table.getAbsoluteTableUri(), + throwable); + return null; + }); + }, + executorService); + } + + return sequentialUpload.thenComposeAsync( + accumulated -> { + if (accumulated == null || accumulated.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + // The final file uploaded is _version_, which is also the marker we use as + // lastUploadedFile for back-compat with the v1 checkpoint shape. + File lastFile = filesToUpload.get(filesToUpload.size() - 1); + Checkpoint updatedCheckpoint = + previousCheckpoint + .toBuilder() + .batchId(previousCheckpoint.getBatchId() + batches.size()) + .lastUploadedFile(lastFile.getFilename()) + .checkpointTimestamp(lastFile.getLastModifiedAt()) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(newManifestVersion) + .build(); + try { + return onehouseApiClient + .upsertTableMetricsCheckpoint( + UpsertTableMetricsCheckpointRequest.builder() + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .tableId(tableId) + .checkpoint(mapper.writeValueAsString(updatedCheckpoint)) + .filesUploaded( + accumulated.stream() + .map(UploadedFile::getName) + .collect(Collectors.toList())) + .uploadedFiles(accumulated) + .build()) + .thenApply( + response -> { + if (response.isFailure()) { + throw new RuntimeException( + String.format( + "failed to update checkpoint: status_code: %d, exception: %s", + response.getStatusCode(), response.getCause())); + } + hudiMetadataExtractorMetrics.incrementTablesProcessedCounter(); + return updatedCheckpoint; + }); + } catch (JsonProcessingException e) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(new RuntimeException("failed to serialise checkpoint", e)); + return failed; + } + }, + executorService); + } + private CompletableFuture executePaginatedBatchUpload( String tableId, Table table, @@ -597,15 +834,8 @@ private boolean isInstantAlreadyUploaded( .compareTo(getCommitIdFromActiveTimelineInstant(checkpoint.getLastUploadedFile())) <= 0; } else { - long fileNumeric = getNumericPartFromArchivedCommit(file.getFilename()); - long checkpointNumeric = - getNumericPartFromArchivedCommit(checkpoint.getLastUploadedFile()); - if (fileNumeric != checkpointNumeric) { - return fileNumeric < checkpointNumeric; - } - // Same numeric key (e.g. V2 parquet files with same leading timestamp - // but different sequence numbers) — compare full filenames - return file.getFilename().compareTo(checkpoint.getLastUploadedFile()) <= 0; + return getNumericPartFromArchivedCommit(file.getFilename()) + <= getNumericPartFromArchivedCommit(checkpoint.getLastUploadedFile()); } } return false; @@ -684,30 +914,11 @@ private BigDecimal getCommitIdFromActiveTimelineInstant(String activeTimeLineIns } private long getNumericPartFromArchivedCommit(String archivedCommitFileName) { - // V1: .commits_.archive.5_20260101-20260115-50 + // V1 archive file: .commits_.archive.5_20260101-20260115-50 Matcher v1Matcher = V1_ARCHIVED_NUMERIC_PATTERN.matcher(archivedCommitFileName); if (v1Matcher.find()) { return Long.parseLong(v1Matcher.group(1)); } - - // V2 parquet: 20260130205837315_20260201000250371_3.parquet - Matcher v2ParquetMatcher = - V2_ARCHIVED_PARQUET_TIMESTAMP_PATTERN.matcher(archivedCommitFileName); - if (v2ParquetMatcher.find()) { - return Long.parseLong(v2ParquetMatcher.group(1)); - } - - // V2 manifest: manifest_4243 - Matcher v2ManifestMatcher = V2_MANIFEST_NUMERIC_PATTERN.matcher(archivedCommitFileName); - if (v2ManifestMatcher.find()) { - return Long.parseLong(v2ManifestMatcher.group(1)); - } - - // V2 version marker: _version_ - if (VERSION_MARKER_FILE.equals(archivedCommitFileName)) { - return Long.MAX_VALUE; - } - throw new IllegalArgumentException( "invalid archived commit file type: " + archivedCommitFileName); } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Checkpoint.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Checkpoint.java index e4d2dabc..92d1a749 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Checkpoint.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Checkpoint.java @@ -16,4 +16,9 @@ public class Checkpoint implements Serializable { @NonNull String lastUploadedFile; String firstIncompleteCommitFile; boolean archivedCommitsProcessed; + // Last LSM manifest version (from .hoodie/timeline/history/_version_) that has been + // mirrored to the backend. Used for V2 (Hudi 1.x / table version >= 8) archived timeline + // upload, where archives live in an LSM tree and filename-based checkpointing breaks + // because compaction rewrites the file set. V1 archived path leaves this at 0. + @Builder.Default int lastArchivedManifestVersion = 0; } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java index d92ef259..17e5549a 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/ParsedHudiProperties.java @@ -1,5 +1,8 @@ package ai.onehouse.metadata_extractor.models; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT; + import ai.onehouse.api.models.request.TableType; import ai.onehouse.constants.MetricsConstants; import lombok.Builder; @@ -14,6 +17,6 @@ public class ParsedHudiProperties { @NonNull String tableName; @NonNull TableType tableType; @Nullable MetricsConstants.MetadataUploadFailureReasons metadataUploadFailureReasons; - @Builder.Default int tableVersion = 6; - @Builder.Default int timelineLayoutVersion = 1; + @Builder.Default int tableVersion = HOODIE_TABLE_VERSION_DEFAULT; + @Builder.Default int timelineLayoutVersion = TIMELINE_LAYOUT_VERSION_DEFAULT; } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java index e6457797..ec918e69 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java @@ -1,5 +1,8 @@ package ai.onehouse.metadata_extractor.models; +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT; +import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT; + import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -15,6 +18,6 @@ public class Table { private final String databaseName; private final String lakeName; private String tableId; - @Builder.Default private final int tableVersion = 6; - @Builder.Default private final int timelineLayoutVersion = 1; + @Builder.Default private final int tableVersion = HOODIE_TABLE_VERSION_DEFAULT; + @Builder.Default private final int timelineLayoutVersion = TIMELINE_LAYOUT_VERSION_DEFAULT; } diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/ContinueOnIncompleteCommitStrategyTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/ContinueOnIncompleteCommitStrategyTest.java index 87808ce6..9ad1906f 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/ContinueOnIncompleteCommitStrategyTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/ContinueOnIncompleteCommitStrategyTest.java @@ -58,6 +58,7 @@ class ContinueOnIncompleteCommitStrategyTest { @Mock private Config config; @Mock private MetadataExtractorConfig metadataExtractorConfig; @Mock private LakeViewExtractorMetrics hudiMetadataExtractorMetrics; + @Mock private LSMTimelineManifestReader lsmTimelineManifestReader; private TimelineCommitInstantsUploader timelineCommitInstantsUploader; private final ObjectMapper mapper = new ObjectMapper(); private static final String S3_TABLE_URI = "s3://bucket/table/"; @@ -88,7 +89,8 @@ private TimelineCommitInstantsUploader getTimelineCommitInstantsUploader() { ForkJoinPool.commonPool(), activeTimelineInstantBatcher, hudiMetadataExtractorMetrics, - config); + config, + lsmTimelineManifestReader); } @BeforeEach diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java index 6e66027d..d718546f 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploaderTest.java @@ -74,6 +74,7 @@ class TimelineCommitInstantsUploaderTest { @Mock private MetadataExtractorConfig metadataExtractorConfig; @Mock private ActiveTimelineInstantBatcher activeTimelineInstantBatcher; @Mock private LakeViewExtractorMetrics hudiMetadataExtractorMetrics; + @Mock private LSMTimelineManifestReader lsmTimelineManifestReader; private TimelineCommitInstantsUploader timelineCommitInstantsUploader; private final ObjectMapper mapper = new ObjectMapper(); private static final String S3_TABLE_URI = "s3://bucket/table/"; @@ -119,7 +120,8 @@ private TimelineCommitInstantsUploader getTimelineCommitInstantsUploader(TestInf ForkJoinPool.commonPool(), activeTimelineInstantBatcher, hudiMetadataExtractorMetrics, - config); + config, + lsmTimelineManifestReader); } @BeforeEach @@ -1487,73 +1489,141 @@ void testPaginatedBatchUploadWithCheckpointExceptionallyBlockWithNullMessage() { } @Test - void testUploadInstantsInArchivedTimelineV2() { + void testUploadInstantsInArchivedTimelineV2_BootstrapMirror() { + // Initial sync: no previous manifest version recorded, so we mirror everything in the + // current manifest (bootstrap path). TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = spy(timelineCommitInstantsUploader); + doReturn(100) + .when(timelineCommitInstantsUploaderSpy) + .getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); - doReturn(1) + String historyUri = TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/"; + String parquet1 = "20260130205837315_20260201000250371_0.parquet"; + String parquet2 = "20260130205837315_20260201000250371_1.parquet"; + + when(lsmTimelineManifestReader.readLatestManifest(historyUri)) + .thenReturn( + CompletableFuture.completedFuture( + LSMTimelineManifestReader.ManifestSnapshot.of( + 1, Arrays.asList(parquet1, parquet2)))); + + // Single batch: hoodie.properties + 2 parquets + manifest_1 + _version_ + List expectedUploads = + Arrays.asList( + UploadedFile.builder().name(HOODIE_PROPERTIES_FILE).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + parquet1).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + parquet2).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + "manifest_1").build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + "_version_").build()); + + Checkpoint expectedCheckpoint = + INITIAL_CHECKPOINT + .toBuilder() + .batchId(1) + .lastUploadedFile("_version_") + .checkpointTimestamp(Instant.EPOCH) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(1) + .build(); + + stubV2ArchivedSingleBatchUpload(expectedUploads, expectedCheckpoint); + + Checkpoint response = + timelineCommitInstantsUploaderSpy + .batchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + INITIAL_CHECKPOINT, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .join(); + + assertEquals(expectedCheckpoint, response); + verify(lsmTimelineManifestReader, times(1)).readLatestManifest(historyUri); + // No previous manifest fetch on the bootstrap path. + verify(lsmTimelineManifestReader, times(0)) + .readManifestFileNames(anyString(), anyInt()); + } + + @Test + void testUploadInstantsInArchivedTimelineV2_IncrementalAfterCompaction() { + // Second sync after a compaction: previous manifest_1 referenced two L0 parquets, + // current manifest_2 references a compacted L1 parquet plus a brand-new L0 parquet. + // We should upload only the files NOT already in the previous manifest. + TimelineCommitInstantsUploader timelineCommitInstantsUploaderSpy = + spy(timelineCommitInstantsUploader); + doReturn(100) .when(timelineCommitInstantsUploaderSpy) .getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + String historyUri = TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/"; + String oldParquet1 = "20260130000000000_20260130100000000_0.parquet"; + String oldParquet2 = "20260130100000000_20260130200000000_0.parquet"; + String compactedParquet = "20260130000000000_20260130200000000_1.parquet"; + String newParquet = "20260130200000000_20260131000000000_0.parquet"; - // V2 archived timeline path: .hoodie/timeline/history/ - mockListAllFilesInDir( - TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/", + when(lsmTimelineManifestReader.readLatestManifest(historyUri)) + .thenReturn( + CompletableFuture.completedFuture( + LSMTimelineManifestReader.ManifestSnapshot.of( + 2, Arrays.asList(compactedParquet, newParquet)))); + when(lsmTimelineManifestReader.readManifestFileNames(historyUri, 1)) + .thenReturn( + CompletableFuture.completedFuture(Arrays.asList(oldParquet1, oldParquet2))); + + // Both files in the new manifest are net-new vs. manifest_1, so both get uploaded + // alongside manifest_2 and _version_. hoodie.properties is NOT included because batchId > 0. + List expectedUploads = Arrays.asList( - generateFileObj("should_be_ignored", false), - generateFileObj("20260130205837315_20260201000250371_1.parquet", false), - generateFileObj("20260130205837315_20260201000250371_2.parquet", false), - generateFileObj("manifest_1", false), - generateFileObj("_version_", false, currentTime))); - - // Sort order by getNumericPartFromArchivedCommit: - // manifest_1 -> 1, parquet_1 -> 20260130205837315, parquet_2 -> 20260130205837315 - // (same timestamp, differentiated by full filename comparison), _version_ -> Long.MAX_VALUE - // With batchId=0, hoodie.properties is prepended - Checkpoint checkpoint0 = generateCheckpointObj(1, Instant.EPOCH, false, HOODIE_PROPERTIES_FILE); - Checkpoint checkpoint1 = - generateCheckpointObj(2, Instant.EPOCH, false, "manifest_1"); - Checkpoint checkpoint2 = - generateCheckpointObj( - 3, Instant.EPOCH, false, "20260130205837315_20260201000250371_1.parquet"); - Checkpoint checkpoint3 = - generateCheckpointObj( - 4, Instant.EPOCH, false, "20260130205837315_20260201000250371_2.parquet"); - Checkpoint checkpoint4 = - generateCheckpointObj(5, currentTime, false, "_version_"); + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + compactedParquet).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + newParquet).build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + "manifest_2").build(), + UploadedFile.builder().name(TIMELINE_HISTORY_PREFIX + "_version_").build()); - stubUploadInstantsCallsV2( - Collections.singletonList(UploadedFile.builder().name(HOODIE_PROPERTIES_FILE).build()), - checkpoint0, - CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); - stubUploadInstantsCallsV2( - Collections.singletonList(UploadedFile.builder().name("manifest_1").build()), - checkpoint1, - CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); - stubUploadInstantsCallsV2( - Collections.singletonList( - UploadedFile.builder() - .name("20260130205837315_20260201000250371_1.parquet") - .build()), - checkpoint2, - CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); - stubUploadInstantsCallsV2( - Collections.singletonList( - UploadedFile.builder() - .name("20260130205837315_20260201000250371_2.parquet") - .build()), - checkpoint3, - CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); - stubUploadInstantsCallsV2( - Collections.singletonList( - UploadedFile.builder() - .name("_version_") - .lastModifiedAt(currentTime.toEpochMilli()) - .build()), - checkpoint4, - CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED); + Checkpoint previousCheckpoint = + INITIAL_CHECKPOINT + .toBuilder() + .batchId(3) + .lastUploadedFile("_version_") + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(1) + .build(); + Checkpoint expectedCheckpoint = + previousCheckpoint + .toBuilder() + .batchId(4) + .lastUploadedFile("_version_") + .checkpointTimestamp(Instant.EPOCH) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(2) + .build(); + + stubV2ArchivedSingleBatchUpload(expectedUploads, expectedCheckpoint); Checkpoint response = timelineCommitInstantsUploaderSpy + .batchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + previousCheckpoint, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .join(); + + assertEquals(expectedCheckpoint, response); + verify(lsmTimelineManifestReader, times(1)).readManifestFileNames(historyUri, 1); + } + + @Test + void testUploadInstantsInArchivedTimelineV2_NoArchivesYet() { + // Table has no archived timeline yet (no _version_ file). The reader returns an empty + // snapshot and we mark archived as processed without touching the API client. + String historyUri = TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/"; + when(lsmTimelineManifestReader.readLatestManifest(historyUri)) + .thenReturn( + CompletableFuture.completedFuture( + LSMTimelineManifestReader.ManifestSnapshot.empty())); + + Checkpoint response = + timelineCommitInstantsUploader .batchUploadWithCheckpoint( TABLE_ID.toString(), TABLE_V2, @@ -1561,7 +1631,82 @@ void testUploadInstantsInArchivedTimelineV2() { CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) .join(); - assertEquals(checkpoint4, response); + assertEquals(true, response.isArchivedCommitsProcessed()); + assertEquals(0, response.getLastArchivedManifestVersion()); + verify(onehouseApiClient, times(0)).generateCommitMetadataUploadUrl(any()); + verify(onehouseApiClient, times(0)).upsertTableMetricsCheckpoint(any()); + } + + @Test + void testUploadInstantsInArchivedTimelineV2_AlreadyAtLatestManifest() { + // No new archives since last sync: reader reports the same version we already mirrored. + String historyUri = TABLE_V2.getAbsoluteTableUri() + ".hoodie/timeline/history/"; + when(lsmTimelineManifestReader.readLatestManifest(historyUri)) + .thenReturn( + CompletableFuture.completedFuture( + LSMTimelineManifestReader.ManifestSnapshot.of(7, Collections.emptyList()))); + + Checkpoint previousCheckpoint = + INITIAL_CHECKPOINT + .toBuilder() + .batchId(2) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(7) + .build(); + + Checkpoint response = + timelineCommitInstantsUploader + .batchUploadWithCheckpoint( + TABLE_ID.toString(), + TABLE_V2, + previousCheckpoint, + CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .join(); + + assertEquals(previousCheckpoint, response); + verify(onehouseApiClient, times(0)).generateCommitMetadataUploadUrl(any()); + verify(onehouseApiClient, times(0)).upsertTableMetricsCheckpoint(any()); + } + + @SneakyThrows + private void stubV2ArchivedSingleBatchUpload( + List expectedUploads, Checkpoint expectedCheckpoint) { + List filenames = + expectedUploads.stream().map(UploadedFile::getName).collect(Collectors.toList()); + List presignedUrls = + filenames.stream().map(name -> PRESIGNED_URL_PREFIX + name).collect(Collectors.toList()); + + when(onehouseApiClient.generateCommitMetadataUploadUrl( + GenerateCommitMetadataUploadUrlRequest.builder() + .tableId(TABLE_ID.toString()) + .commitInstants(filenames) + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .build())) + .thenReturn( + CompletableFuture.completedFuture( + GenerateCommitMetadataUploadUrlResponse.builder() + .uploadUrls(presignedUrls) + .build())); + + for (String presignedUrl : presignedUrls) { + String fileName = presignedUrl.substring(PRESIGNED_URL_PREFIX.length()); + String fileUri = S3_TABLE_URI + ".hoodie/" + fileName; + when(presignedUrlFileUploader.uploadFileToPresignedUrl( + eq(presignedUrl), eq(fileUri), anyInt())) + .thenReturn(CompletableFuture.completedFuture(null)); + } + + when(onehouseApiClient.upsertTableMetricsCheckpoint( + UpsertTableMetricsCheckpointRequest.builder() + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .tableId(TABLE_ID.toString()) + .checkpoint(mapper.writeValueAsString(expectedCheckpoint)) + .filesUploaded(filenames) + .uploadedFiles(expectedUploads) + .build())) + .thenReturn( + CompletableFuture.completedFuture( + UpsertTableMetricsCheckpointResponse.builder().build())); } @Tag("Blocking") From 132d8d8eee2621bc9c9a11da4a9e2c86d1177bff Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Tue, 21 Apr 2026 11:07:17 -0700 Subject: [PATCH 07/10] [ENG-38901] Fix lakeview-sync-tool compile: add LSMTimelineManifestReader to constructor Co-Authored-By: Claude Opus 4.6 (1M context) --- .../java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java index 1c9cb0d5..146285e3 100644 --- a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java +++ b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java @@ -14,6 +14,7 @@ import ai.onehouse.config.models.configv1.ParserConfig; import ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher; import ai.onehouse.metadata_extractor.HoodiePropertiesReader; +import ai.onehouse.metadata_extractor.LSMTimelineManifestReader; import ai.onehouse.metadata_extractor.TableDiscoveryAndUploadJob; import ai.onehouse.metadata_extractor.TableDiscoveryService; import ai.onehouse.metadata_extractor.TableMetadataUploaderService; @@ -259,9 +260,11 @@ private TableDiscoveryAndUploadJob getTableDiscoveryAndUploadJob(@Nonnull Config lakeViewExtractorMetrics); PresignedUrlFileUploader presignedUrlFileUploader = new PresignedUrlFileUploader(asyncStorageClient, asyncHttpClientWithRetry, lakeViewExtractorMetrics); + LSMTimelineManifestReader lsmTimelineManifestReader = new LSMTimelineManifestReader(asyncStorageClient, + storageUtils); TimelineCommitInstantsUploader timelineCommitInstantsUploader = new TimelineCommitInstantsUploader(asyncStorageClient, presignedUrlFileUploader, onehouseApiClient, storageUtils, executorService, new ActiveTimelineInstantBatcher(config), - lakeViewExtractorMetrics, config); + lakeViewExtractorMetrics, config, lsmTimelineManifestReader); TableMetadataUploaderService tableMetadataUploaderService = new TableMetadataUploaderService(hoodiePropertiesReader, onehouseApiClient, timelineCommitInstantsUploader, lakeViewExtractorMetrics, executorService); From 0b7702807106ca08b5f0b6350759ac51ac9bcb85 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Tue, 21 Apr 2026 11:24:56 -0700 Subject: [PATCH 08/10] [ENG-38901] Reduce cognitive complexity and fix SonarCloud issues - Extract processManifestSnapshot, buildV2ArchivedUploadList, and upsertV2ArchivedCheckpoint helpers to bring cognitive complexity of executeManifestDrivenArchivedUpload and uploadV2ArchivedFilesInBatches under the 15 threshold. - Replace generic RuntimeException with IllegalStateException for checkpoint serialization/upsert failures. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../TimelineCommitInstantsUploader.java | 231 ++++++++++-------- 1 file changed, 131 insertions(+), 100 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java index d1cc4366..61e5754a 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java @@ -230,65 +230,9 @@ private CompletableFuture executeManifestDrivenArchivedUpload( return lsmTimelineManifestReader .readLatestManifest(historyUri) .thenComposeAsync( - currentSnapshot -> { - if (currentSnapshot.isEmpty()) { - log.info( - "No V2 archived timeline yet for table {} (no _version_ file). Skipping.", - table); - return CompletableFuture.completedFuture( - checkpoint.toBuilder().archivedCommitsProcessed(true).build()); - } - - int previousVersion = checkpoint.getLastArchivedManifestVersion(); - int currentVersion = currentSnapshot.getVersion(); - if (currentVersion == previousVersion && checkpoint.isArchivedCommitsProcessed()) { - log.info( - "V2 archived timeline already at manifest_{} for table {}; nothing to do.", - currentVersion, - table); - return CompletableFuture.completedFuture(checkpoint); - } - - CompletableFuture> previouslyMirroredFuture = - previousVersion > 0 - ? lsmTimelineManifestReader - .readManifestFileNames(historyUri, previousVersion) - .thenApply(HashSet::new) - : CompletableFuture.completedFuture(new HashSet<>()); - - return previouslyMirroredFuture.thenComposeAsync( - previouslyMirrored -> { - List filesToUpload = new ArrayList<>(); - if (checkpoint.getBatchId() == 0) { - filesToUpload.add(HOODIE_PROPERTIES_FILE_OBJ); - } - int newParquetCount = 0; - for (String parquet : currentSnapshot.getParquetFileNames()) { - if (!previouslyMirrored.contains(parquet)) { - filesToUpload.add(buildArchivedFile(parquet)); - newParquetCount++; - } - } - // Upload order: parquets -> manifest -> _version_. The manifest must arrive - // after every parquet it references, and _version_ must be the very last - // write so a partial run never advertises an inconsistent snapshot to the - // backend. - filesToUpload.add(buildArchivedFile(MANIFEST_FILE_PREFIX + currentVersion)); - filesToUpload.add(buildArchivedFile(VERSION_MARKER_FILE)); - - log.info( - "V2 archived: mirroring {} new parquet(s) plus manifest_{} for table {}" - + " (previous mirrored manifest version: {})", - newParquetCount, - currentVersion, - table, - previousVersion); - - return uploadV2ArchivedFilesInBatches( - tableId, table, filesToUpload, checkpoint, currentVersion); - }, - executorService); - }, + currentSnapshot -> + processManifestSnapshot( + tableId, table, historyUri, checkpoint, currentSnapshot), executorService) .exceptionally( throwable -> { @@ -307,6 +251,80 @@ private CompletableFuture executeManifestDrivenArchivedUpload( }); } + private CompletableFuture processManifestSnapshot( + String tableId, + Table table, + String historyUri, + Checkpoint checkpoint, + LSMTimelineManifestReader.ManifestSnapshot currentSnapshot) { + if (currentSnapshot.isEmpty()) { + log.info( + "No V2 archived timeline yet for table {} (no _version_ file). Skipping.", table); + return CompletableFuture.completedFuture( + checkpoint.toBuilder().archivedCommitsProcessed(true).build()); + } + + int previousVersion = checkpoint.getLastArchivedManifestVersion(); + int currentVersion = currentSnapshot.getVersion(); + if (currentVersion == previousVersion && checkpoint.isArchivedCommitsProcessed()) { + log.info( + "V2 archived timeline already at manifest_{} for table {}; nothing to do.", + currentVersion, + table); + return CompletableFuture.completedFuture(checkpoint); + } + + CompletableFuture> previouslyMirroredFuture = + previousVersion > 0 + ? lsmTimelineManifestReader + .readManifestFileNames(historyUri, previousVersion) + .thenApply(HashSet::new) + : CompletableFuture.completedFuture(new HashSet<>()); + + return previouslyMirroredFuture.thenComposeAsync( + previouslyMirrored -> { + List filesToUpload = + buildV2ArchivedUploadList(checkpoint, currentSnapshot, previouslyMirrored, + currentVersion); + int newParquetCount = filesToUpload.size() - 2 + - (checkpoint.getBatchId() == 0 ? 1 : 0); + + log.info( + "V2 archived: mirroring {} new parquet(s) plus manifest_{} for table {}" + + " (previous mirrored manifest version: {})", + newParquetCount, + currentVersion, + table, + previousVersion); + + return uploadV2ArchivedFilesInBatches( + tableId, table, filesToUpload, checkpoint, currentVersion); + }, + executorService); + } + + private List buildV2ArchivedUploadList( + Checkpoint checkpoint, + LSMTimelineManifestReader.ManifestSnapshot currentSnapshot, + Set previouslyMirrored, + int currentVersion) { + List filesToUpload = new ArrayList<>(); + if (checkpoint.getBatchId() == 0) { + filesToUpload.add(HOODIE_PROPERTIES_FILE_OBJ); + } + for (String parquet : currentSnapshot.getParquetFileNames()) { + if (!previouslyMirrored.contains(parquet)) { + filesToUpload.add(buildArchivedFile(parquet)); + } + } + // Upload order: parquets -> manifest -> _version_. The manifest must arrive after every + // parquet it references, and _version_ must be the very last write so a partial run never + // advertises an inconsistent snapshot to the backend. + filesToUpload.add(buildArchivedFile(MANIFEST_FILE_PREFIX + currentVersion)); + filesToUpload.add(buildArchivedFile(VERSION_MARKER_FILE)); + return filesToUpload; + } + private static File buildArchivedFile(String filename) { return File.builder() .filename(filename) @@ -390,51 +408,64 @@ private CompletableFuture uploadV2ArchivedFilesInBatches( if (accumulated == null || accumulated.isEmpty()) { return CompletableFuture.completedFuture(null); } - // The final file uploaded is _version_, which is also the marker we use as - // lastUploadedFile for back-compat with the v1 checkpoint shape. - File lastFile = filesToUpload.get(filesToUpload.size() - 1); - Checkpoint updatedCheckpoint = - previousCheckpoint - .toBuilder() - .batchId(previousCheckpoint.getBatchId() + batches.size()) - .lastUploadedFile(lastFile.getFilename()) - .checkpointTimestamp(lastFile.getLastModifiedAt()) - .archivedCommitsProcessed(true) - .lastArchivedManifestVersion(newManifestVersion) - .build(); - try { - return onehouseApiClient - .upsertTableMetricsCheckpoint( - UpsertTableMetricsCheckpointRequest.builder() - .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) - .tableId(tableId) - .checkpoint(mapper.writeValueAsString(updatedCheckpoint)) - .filesUploaded( - accumulated.stream() - .map(UploadedFile::getName) - .collect(Collectors.toList())) - .uploadedFiles(accumulated) - .build()) - .thenApply( - response -> { - if (response.isFailure()) { - throw new RuntimeException( - String.format( - "failed to update checkpoint: status_code: %d, exception: %s", - response.getStatusCode(), response.getCause())); - } - hudiMetadataExtractorMetrics.incrementTablesProcessedCounter(); - return updatedCheckpoint; - }); - } catch (JsonProcessingException e) { - CompletableFuture failed = new CompletableFuture<>(); - failed.completeExceptionally(new RuntimeException("failed to serialise checkpoint", e)); - return failed; - } + return upsertV2ArchivedCheckpoint( + tableId, filesToUpload, accumulated, previousCheckpoint, batches.size(), + newManifestVersion); }, executorService); } + private CompletableFuture upsertV2ArchivedCheckpoint( + String tableId, + List filesToUpload, + List accumulated, + Checkpoint previousCheckpoint, + int batchCount, + int newManifestVersion) { + // The final file uploaded is _version_, which is also the marker we use as + // lastUploadedFile for back-compat with the v1 checkpoint shape. + File lastFile = filesToUpload.get(filesToUpload.size() - 1); + Checkpoint updatedCheckpoint = + previousCheckpoint + .toBuilder() + .batchId(previousCheckpoint.getBatchId() + batchCount) + .lastUploadedFile(lastFile.getFilename()) + .checkpointTimestamp(lastFile.getLastModifiedAt()) + .archivedCommitsProcessed(true) + .lastArchivedManifestVersion(newManifestVersion) + .build(); + try { + return onehouseApiClient + .upsertTableMetricsCheckpoint( + UpsertTableMetricsCheckpointRequest.builder() + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED) + .tableId(tableId) + .checkpoint(mapper.writeValueAsString(updatedCheckpoint)) + .filesUploaded( + accumulated.stream() + .map(UploadedFile::getName) + .collect(Collectors.toList())) + .uploadedFiles(accumulated) + .build()) + .thenApply( + response -> { + if (response.isFailure()) { + throw new IllegalStateException( + String.format( + "failed to update checkpoint: status_code: %d, exception: %s", + response.getStatusCode(), response.getCause())); + } + hudiMetadataExtractorMetrics.incrementTablesProcessedCounter(); + return updatedCheckpoint; + }); + } catch (JsonProcessingException e) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally( + new IllegalStateException("failed to serialise checkpoint", e)); + return failed; + } + } + private CompletableFuture executePaginatedBatchUpload( String tableId, Table table, From bc8bef509d055b1c61b626fc7f87a2411fa1401b Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Tue, 21 Apr 2026 11:46:53 -0700 Subject: [PATCH 09/10] [ENG-38901] Add LSMTimelineManifestReader unit tests to fix coverage gap SonarCloud quality gate failed at 72.7% coverage on new code (threshold 80%). The reader class was mocked in the uploader tests so its internals were uncovered. Adds 8 tests covering: happy path, missing _version_, missing manifest, empty files array, unknown JSON fields, whitespace in version file, and ManifestSnapshot.empty(). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../LSMTimelineManifestReaderTest.java | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java new file mode 100644 index 00000000..bc1c0c6f --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java @@ -0,0 +1,146 @@ +package ai.onehouse.metadata_extractor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import ai.onehouse.metadata_extractor.LSMTimelineManifestReader.ManifestSnapshot; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.StorageUtils; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class LSMTimelineManifestReaderTest { + @Mock private AsyncStorageClient asyncStorageClient; + private LSMTimelineManifestReader reader; + + private static final String HISTORY_URI = "s3://bucket/table/.hoodie/timeline/history/"; + + @BeforeEach + void setUp() { + reader = new LSMTimelineManifestReader(asyncStorageClient, new StorageUtils()); + } + + @Test + void testReadLatestManifest() { + mockReadFile(HISTORY_URI + "_version_", "3"); + mockReadFile( + HISTORY_URI + "manifest_3", + "{\n" + + " \"files\" : [ {\n" + + " \"fileName\" : \"20260101_20260102_0.parquet\",\n" + + " \"fileLen\" : 1024\n" + + " }, {\n" + + " \"fileName\" : \"20260102_20260103_1.parquet\",\n" + + " \"fileLen\" : 2048\n" + + " } ]\n" + + "}"); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertEquals(3, snapshot.getVersion()); + assertEquals( + Arrays.asList("20260101_20260102_0.parquet", "20260102_20260103_1.parquet"), + snapshot.getParquetFileNames()); + assertTrue(!snapshot.isEmpty()); + } + + @Test + void testReadLatestManifest_VersionFileMissing() { + when(asyncStorageClient.readFileAsBytes(HISTORY_URI + "_version_")) + .thenReturn(failedFuture(new RuntimeException("NoSuchKey"))); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertTrue(snapshot.isEmpty()); + assertEquals(0, snapshot.getVersion()); + assertEquals(Collections.emptyList(), snapshot.getParquetFileNames()); + } + + @Test + void testReadLatestManifest_EmptyFilesArray() { + mockReadFile(HISTORY_URI + "_version_", "1"); + mockReadFile(HISTORY_URI + "manifest_1", "{ \"files\" : [] }"); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertEquals(1, snapshot.getVersion()); + assertEquals(Collections.emptyList(), snapshot.getParquetFileNames()); + assertTrue(!snapshot.isEmpty()); + } + + @Test + void testReadManifestFileNames() { + mockReadFile( + HISTORY_URI + "manifest_5", + "{ \"files\" : [ { \"fileName\" : \"a.parquet\", \"fileLen\" : 100 } ] }"); + + List files = reader.readManifestFileNames(HISTORY_URI, 5).join(); + + assertEquals(Collections.singletonList("a.parquet"), files); + } + + @Test + void testReadManifestFileNames_ManifestMissing() { + when(asyncStorageClient.readFileAsBytes(HISTORY_URI + "manifest_99")) + .thenReturn(failedFuture(new RuntimeException("NoSuchKey"))); + + List files = reader.readManifestFileNames(HISTORY_URI, 99).join(); + + assertEquals(Collections.emptyList(), files); + } + + @Test + void testReadLatestManifest_IgnoresUnknownFields() { + mockReadFile(HISTORY_URI + "_version_", "1"); + mockReadFile( + HISTORY_URI + "manifest_1", + "{ \"files\" : [ { \"fileName\" : \"x.parquet\", \"fileLen\" : 42," + + " \"unknownField\" : true } ], \"extraTopLevel\" : 123 }"); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertEquals(1, snapshot.getVersion()); + assertEquals(Collections.singletonList("x.parquet"), snapshot.getParquetFileNames()); + } + + @Test + void testReadLatestManifest_VersionWithWhitespace() { + mockReadFile(HISTORY_URI + "_version_", " 7\n"); + mockReadFile(HISTORY_URI + "manifest_7", "{ \"files\" : [] }"); + + ManifestSnapshot snapshot = reader.readLatestManifest(HISTORY_URI).join(); + + assertEquals(7, snapshot.getVersion()); + } + + @Test + void testManifestSnapshotEmpty() { + ManifestSnapshot empty = ManifestSnapshot.empty(); + assertTrue(empty.isEmpty()); + assertEquals(0, empty.getVersion()); + assertEquals(Collections.emptyList(), empty.getParquetFileNames()); + } + + private void mockReadFile(String uri, String content) { + when(asyncStorageClient.readFileAsBytes(uri)) + .thenReturn( + CompletableFuture.completedFuture(content.getBytes(StandardCharsets.UTF_8))); + } + + private static CompletableFuture failedFuture(Throwable ex) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(ex); + return f; + } +} From 426a91f790bdc1aa8474204e42f603c202faf99f Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Fri, 24 Apr 2026 11:55:48 -0700 Subject: [PATCH 10/10] [ENG-38901] Address review: propagate manifest read failure, add file count to log - readLatestManifest now only returns empty when _version_ is missing. A successful _version_ read followed by a manifest read failure propagates the exception so the upload retries next cycle. - Added previouslyMirrored.size() to the V2 archived log line for better observability. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../LSMTimelineManifestReader.java | 21 ++++++++++++------- .../TimelineCommitInstantsUploader.java | 5 +++-- .../LSMTimelineManifestReaderTest.java | 17 ++++++++++++++- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java index 1eb1aadb..8eceb7e0 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReader.java @@ -59,19 +59,26 @@ public CompletableFuture readLatestManifest(String historyDire String versionFileUri = storageUtils.constructFileUri(historyDirectoryUri, VERSION_MARKER_FILE); return asyncStorageClient .readFileAsBytes(versionFileUri) - .thenCompose( - versionBytes -> { - int version = parseVersionFile(versionBytes); - return readManifestForVersion(historyDirectoryUri, version) - .thenApply(files -> ManifestSnapshot.of(version, files)); - }) .exceptionally( throwable -> { + // Only treat as empty when _version_ itself is missing (no archives written yet). log.info( "No V2 archived timeline _version_ file at {} (treating as empty). Reason: {}", versionFileUri, throwable.getMessage()); - return ManifestSnapshot.empty(); + return null; + }) + .thenCompose( + versionBytes -> { + if (versionBytes == null) { + return CompletableFuture.completedFuture(ManifestSnapshot.empty()); + } + int version = parseVersionFile(versionBytes); + // Manifest read failures propagate to the caller so the upload is retried + // on the next sync cycle rather than silently treating a readable _version_ + // as "no archives." + return readManifestForVersion(historyDirectoryUri, version) + .thenApply(files -> ManifestSnapshot.of(version, files)); }); } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java index 61e5754a..ca25128b 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TimelineCommitInstantsUploader.java @@ -291,11 +291,12 @@ private CompletableFuture processManifestSnapshot( log.info( "V2 archived: mirroring {} new parquet(s) plus manifest_{} for table {}" - + " (previous mirrored manifest version: {})", + + " (previous manifest version: {}, previous file count: {})", newParquetCount, currentVersion, table, - previousVersion); + previousVersion, + previouslyMirrored.size()); return uploadV2ArchivedFilesInBatches( tableId, table, filesToUpload, checkpoint, currentVersion); diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java index bc1c0c6f..f5501b90 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/LSMTimelineManifestReaderTest.java @@ -1,8 +1,8 @@ package ai.onehouse.metadata_extractor; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; import ai.onehouse.metadata_extractor.LSMTimelineManifestReader.ManifestSnapshot; @@ -13,6 +13,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -67,6 +68,20 @@ void testReadLatestManifest_VersionFileMissing() { assertEquals(Collections.emptyList(), snapshot.getParquetFileNames()); } + @Test + void testReadLatestManifest_ManifestReadFailurePropagates() { + // _version_ exists but the manifest it points to is unreadable. This should NOT + // return empty — it should propagate the exception so the caller retries next cycle. + mockReadFile(HISTORY_URI + "_version_", "2"); + when(asyncStorageClient.readFileAsBytes(HISTORY_URI + "manifest_2")) + .thenReturn(failedFuture(new RuntimeException("AccessDenied"))); + + CompletableFuture future = reader.readLatestManifest(HISTORY_URI); + + CompletionException ex = assertThrows(CompletionException.class, future::join); + assertTrue(ex.getCause().getMessage().contains("AccessDenied")); + } + @Test void testReadLatestManifest_EmptyFilesArray() { mockReadFile(HISTORY_URI + "_version_", "1");