From 83401e83c2ed2ad17b8604438ff6bc2bd03ab912 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Tue, 12 May 2026 19:30:13 -0700 Subject: [PATCH 1/6] Discover and upload Iceberg tables alongside Hudi Extends LakeView to find Iceberg tables (metadata/*.metadata.json) in addition to Hudi tables (.hoodie/) and upload their current metadata.json to the control plane for parsing. Iceberg upload is a parallel orchestrator rather than a Hudi-coupled branch: the active/archived timeline, hoodie.properties bootstrap, and LSM manifest plumbing don't apply. Abstractions: - TableFormatDetector SPI with HudiTableFormatDetector + IcebergTableFormatDetector implementations. TableDiscoveryService iterates registered detectors; the first match determines the format and tags the Table. - New IcebergMetadataUploaderService sibling of TableMetadataUploaderService. Reuses OnehouseApiClient, PresignedUrlFileUploader, AsyncStorageClient. - TableDiscoveryAndUploadJob.dispatchUpload partitions discovered tables by format and runs the two uploaders concurrently. Wire: - TableFormat enum (HUDI, ICEBERG) added to api/models/request. - Table model carries tableFormat (default HUDI for backward compat). - InitializeSingleTableMetricsCheckpointRequest gains tableFormat (nullable; server treats absent as HUDI). TableType (COW/MOR) stays @NonNull; Iceberg uploads pass COW as a meaningless placeholder since the server discriminates on tableFormat first. Depends on idls PR #1939 for ObservedTableFormat on the proto side. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...itializeTableMetricsCheckpointRequest.java | 6 + .../api/models/request/TableFormat.java | 6 + .../constants/MetadataExtractorConstants.java | 2 + .../HudiTableFormatDetector.java | 20 ++ .../IcebergMetadataUploaderService.java | 311 ++++++++++++++++++ .../IcebergTableFormatDetector.java | 26 ++ .../TableDiscoveryAndUploadJob.java | 36 +- .../TableDiscoveryService.java | 35 +- .../TableFormatDetector.java | 20 ++ .../metadata_extractor/models/Table.java | 3 + .../TableDiscoveryAndUploadJobTest.java | 8 + .../TableDiscoveryServiceTest.java | 20 +- .../TestHudiTableFormatDetector.java | 46 +++ .../TestIcebergMetadataUploaderService.java | 66 ++++ .../TestIcebergTableFormatDetector.java | 45 +++ 15 files changed, 631 insertions(+), 19 deletions(-) create mode 100644 lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java create mode 100644 lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java create mode 100644 lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java create mode 100644 lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java create mode 100644 lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java create mode 100644 lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java create mode 100644 lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java create mode 100644 lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java diff --git a/lakeview/src/main/java/ai/onehouse/api/models/request/InitializeTableMetricsCheckpointRequest.java b/lakeview/src/main/java/ai/onehouse/api/models/request/InitializeTableMetricsCheckpointRequest.java index 70d3debc..faa85477 100644 --- a/lakeview/src/main/java/ai/onehouse/api/models/request/InitializeTableMetricsCheckpointRequest.java +++ b/lakeview/src/main/java/ai/onehouse/api/models/request/InitializeTableMetricsCheckpointRequest.java @@ -21,7 +21,13 @@ public class InitializeTableMetricsCheckpointRequest { public static class InitializeSingleTableMetricsCheckpointRequest { @NonNull String tableId; @NonNull String tableName; + // For Hudi: source of truth for COW vs MOR. For non-Hudi formats it is a meaningless + // placeholder; the server discriminates on tableFormat first. Kept non-null to preserve + // the existing wire contract; defaults to COPY_ON_WRITE for Iceberg tables. @NonNull TableType tableType; + // Physical table format. Absent is interpreted by the server as HUDI for backward compat + // with extractors that haven't been updated. + @Nullable TableFormat tableFormat; String lakeName; String databaseName; String tableBasePath; diff --git a/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java b/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java new file mode 100644 index 00000000..8616370c --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java @@ -0,0 +1,6 @@ +package ai.onehouse.api.models.request; + +public enum TableFormat { + HUDI, + ICEBERG +} diff --git a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java index 4877f87e..e752200a 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java @@ -14,6 +14,8 @@ private MetadataExtractorConstants() {} public static final String HOODIE_FOLDER_NAME = ".hoodie"; public static final String ARCHIVED_FOLDER_NAME = "archived"; + public static final String ICEBERG_METADATA_FOLDER_NAME = "metadata"; + public static final String ICEBERG_METADATA_FILE_SUFFIX = ".metadata.json"; public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name"; public static final String HOODIE_TABLE_TYPE_KEY = "hoodie.table.type"; diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java new file mode 100644 index 00000000..f11e8a72 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java @@ -0,0 +1,20 @@ +package ai.onehouse.metadata_extractor; + +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.models.File; +import java.util.List; + +/** A directory is a Hudi table root if it contains a {@code .hoodie/} folder. */ +public class HudiTableFormatDetector implements TableFormatDetector { + @Override + public TableFormat format() { + return TableFormat.HUDI; + } + + @Override + public boolean matches(List listedFiles) { + return listedFiles.stream().anyMatch(file -> file.getFilename().startsWith(HOODIE_FOLDER_NAME)); + } +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java new file mode 100644 index 00000000..5bce527e --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java @@ -0,0 +1,311 @@ +package ai.onehouse.metadata_extractor; + +import static ai.onehouse.constants.MetadataExtractorConstants.DEFAULT_FILE_UPLOAD_STREAM_BATCH_SIZE; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FILE_SUFFIX; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FOLDER_NAME; +import static ai.onehouse.constants.MetadataExtractorConstants.INITIAL_CHECKPOINT; +import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; + +import ai.onehouse.api.OnehouseApiClient; +import ai.onehouse.api.models.request.CommitTimelineType; +import ai.onehouse.api.models.request.GenerateCommitMetadataUploadUrlRequest; +import ai.onehouse.api.models.request.InitializeTableMetricsCheckpointRequest; +import ai.onehouse.api.models.request.InitializeTableMetricsCheckpointRequest.InitializeSingleTableMetricsCheckpointRequest; +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.api.models.request.TableType; +import ai.onehouse.api.models.request.UploadedFile; +import ai.onehouse.api.models.request.UpsertTableMetricsCheckpointRequest; +import ai.onehouse.api.models.response.GenerateCommitMetadataUploadUrlResponse; +import ai.onehouse.api.models.response.GetTableMetricsCheckpointResponse; +import ai.onehouse.constants.MetricsConstants; +import ai.onehouse.metadata_extractor.models.Checkpoint; +import ai.onehouse.metadata_extractor.models.Table; +import ai.onehouse.metrics.LakeViewExtractorMetrics; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.PresignedUrlFileUploader; +import ai.onehouse.storage.StorageUtils; +import ai.onehouse.storage.models.File; +import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.inject.Inject; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +/** + * Discovers and uploads Iceberg table metadata pointer files ({@code metadata/*.metadata.json}). + * + *

Iceberg's snapshot-summary contract means a single {@code metadata.json} contains all of: + * cumulative {@code total-records}, {@code total-files-size}, {@code total-data-files}, + * per-snapshot deltas, and {@code snapshot-log[]} history. We therefore upload exactly one file + * per iteration — the latest {@code metadata.json}. The control plane parses snapshot summaries + * from it and writes metrics to OpenSearch. + * + *

Parallel to {@link TableMetadataUploaderService} (the Hudi orchestrator) rather than a + * subclass: Hudi's active/archived timeline distinction, hoodie.properties bootstrap, and LSM + * manifest plumbing don't apply, so a separate service is clearer than a branching megaclass. + * Shared primitives ({@link OnehouseApiClient}, {@link PresignedUrlFileUploader}, {@link + * AsyncStorageClient}) are reused. + */ +@Slf4j +public class IcebergMetadataUploaderService { + private final AsyncStorageClient asyncStorageClient; + private final OnehouseApiClient onehouseApiClient; + private final PresignedUrlFileUploader presignedUrlFileUploader; + private final StorageUtils storageUtils; + private final LakeViewExtractorMetrics metrics; + private final ObjectMapper mapper; + + @Inject + public IcebergMetadataUploaderService( + @Nonnull @TableDiscoveryObjectStorageAsyncClient AsyncStorageClient asyncStorageClient, + @Nonnull OnehouseApiClient onehouseApiClient, + @Nonnull PresignedUrlFileUploader presignedUrlFileUploader, + @Nonnull StorageUtils storageUtils, + @Nonnull LakeViewExtractorMetrics metrics) { + this.asyncStorageClient = asyncStorageClient; + this.onehouseApiClient = onehouseApiClient; + this.presignedUrlFileUploader = presignedUrlFileUploader; + this.storageUtils = storageUtils; + this.metrics = metrics; + this.mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + } + + /** + * Uploads the latest metadata.json for each Iceberg table whose pointer file has advanced since + * the last checkpoint. Returns true only if every table either succeeded or was already + * up-to-date. + */ + public CompletableFuture uploadInstantsInTables(Set tables) { + if (tables.isEmpty()) { + return CompletableFuture.completedFuture(true); + } + log.info("Uploading Iceberg metadata for {} table(s)", tables.size()); + List> perTable = + tables.stream().map(this::processTable).collect(Collectors.toList()); + return CompletableFuture.allOf(perTable.toArray(new CompletableFuture[0])) + .thenApply( + ignored -> perTable.stream().map(CompletableFuture::join).allMatch(Boolean.TRUE::equals)); + } + + private CompletableFuture processTable(Table table) { + return onehouseApiClient + .getTableMetricsCheckpoints(Collections.singletonList(table.getTableId())) + .thenComposeAsync( + response -> { + if (response.isFailure()) { + log.error( + "Failed to fetch checkpoint for Iceberg table {} (status {}): {}", + table.getTableId(), + response.getStatusCode(), + response.getCause()); + return CompletableFuture.completedFuture(false); + } + Optional existing = parseCheckpoint(response, table.getTableId()); + if (existing.isPresent()) { + return uploadIfNewMetadataJson(table, existing.get()); + } + return initialiseTable(table) + .thenComposeAsync( + initOk -> { + if (!initOk) { + return CompletableFuture.completedFuture(false); + } + return uploadIfNewMetadataJson(table, INITIAL_CHECKPOINT); + }); + }) + .exceptionally( + e -> { + log.error("Exception processing Iceberg table {}", table.getTableId(), e); + metrics.incrementTableMetadataProcessingFailureCounter( + getMetadataExtractorFailureReason( + e, MetricsConstants.MetadataUploadFailureReasons.UNKNOWN), + String.format("Iceberg upload exception: %s", e.getMessage())); + return false; + }); + } + + private Optional parseCheckpoint( + GetTableMetricsCheckpointResponse response, String tableId) { + return response.getCheckpoints().stream() + .filter(c -> tableId.equals(c.getTableId())) + .findFirst() + .map(GetTableMetricsCheckpointResponse.TableMetadataCheckpoint::getCheckpoint) + .filter(StringUtils::isNotBlank) + .map( + json -> { + try { + return mapper.readValue(json, Checkpoint.class); + } catch (JsonProcessingException e) { + log.error("Malformed checkpoint JSON for Iceberg table {}; treating as missing", tableId, e); + return null; + } + }); + } + + private CompletableFuture initialiseTable(Table table) { + String tableName = deriveTableName(table.getAbsoluteTableUri()); + InitializeSingleTableMetricsCheckpointRequest single = + InitializeSingleTableMetricsCheckpointRequest.builder() + .tableId(table.getTableId()) + .tableName(tableName) + // COW is a meaningless placeholder for Iceberg; server discriminates on tableFormat. + .tableType(TableType.COPY_ON_WRITE) + .tableFormat(TableFormat.ICEBERG) + .lakeName(table.getLakeName()) + .databaseName(table.getDatabaseName()) + .tableBasePath(table.getAbsoluteTableUri()) + .build(); + return onehouseApiClient + .initializeTableMetricsCheckpoint( + InitializeTableMetricsCheckpointRequest.builder() + .tables(Collections.singletonList(single)) + .build()) + .thenApply( + initResponse -> { + if (initResponse.isFailure()) { + log.error( + "Failed to initialise Iceberg table {} (status {}): {}", + table.getTableId(), + initResponse.getStatusCode(), + initResponse.getCause()); + return false; + } + return true; + }); + } + + private CompletableFuture uploadIfNewMetadataJson(Table table, Checkpoint checkpoint) { + String metadataDirUri = + storageUtils.constructFileUri(table.getAbsoluteTableUri(), ICEBERG_METADATA_FOLDER_NAME); + return asyncStorageClient + .listAllFilesInDir(metadataDirUri) + .thenComposeAsync( + files -> { + Optional latest = pickLatestMetadataJson(files); + if (!latest.isPresent()) { + log.warn( + "Iceberg table {} has no metadata.json under {}", + table.getTableId(), + metadataDirUri); + return CompletableFuture.completedFuture(true); + } + if (latest.get().getFilename().equals(checkpoint.getLastUploadedFile())) { + log.debug( + "Iceberg table {} already up-to-date at {}", + table.getTableId(), + latest.get().getFilename()); + return CompletableFuture.completedFuture(true); + } + return uploadAndAdvanceCheckpoint(table, metadataDirUri, latest.get(), checkpoint); + }); + } + + private CompletableFuture uploadAndAdvanceCheckpoint( + Table table, String metadataDirUri, File metadataJson, Checkpoint priorCheckpoint) { + String fileUri = storageUtils.constructFileUri(metadataDirUri, metadataJson.getFilename()); + return onehouseApiClient + .generateCommitMetadataUploadUrl( + GenerateCommitMetadataUploadUrlRequest.builder() + .tableId(table.getTableId()) + .commitInstants(Collections.singletonList(metadataJson.getFilename())) + // ACTIVE is a placeholder; Iceberg has no archived/active distinction. + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE) + .build()) + .thenComposeAsync( + urlResponse -> { + if (urlResponse.isFailure() || urlResponse.getUploadUrls().isEmpty()) { + log.error( + "Failed to obtain presigned URL for {} (status {}): {}", + metadataJson.getFilename(), + urlResponse.getStatusCode(), + urlResponse.getCause()); + return CompletableFuture.completedFuture(false); + } + String presigned = urlResponse.getUploadUrls().get(0); + return presignedUrlFileUploader + .uploadFileToPresignedUrl(presigned, fileUri, DEFAULT_FILE_UPLOAD_STREAM_BATCH_SIZE) + .thenComposeAsync( + ignored -> { + metrics.incrementMetadataUploadSuccessCounter(); + return advanceCheckpoint(table, metadataJson, priorCheckpoint); + }); + }); + } + + private CompletableFuture advanceCheckpoint( + Table table, File metadataJson, Checkpoint priorCheckpoint) { + Checkpoint next = + priorCheckpoint + .toBuilder() + .batchId(priorCheckpoint.getBatchId() + 1) + .checkpointTimestamp(Instant.now()) + .lastUploadedFile(metadataJson.getFilename()) + // Iceberg has no archived/active distinction; mark archived processed so consumers + // that interpret the legacy field for ordering don't loop. + .archivedCommitsProcessed(true) + .build(); + String checkpointJson; + try { + checkpointJson = mapper.writeValueAsString(next); + } catch (JsonProcessingException e) { + log.error("Failed to serialise Iceberg checkpoint for table {}", table.getTableId(), e); + return CompletableFuture.completedFuture(false); + } + UpsertTableMetricsCheckpointRequest request = + UpsertTableMetricsCheckpointRequest.builder() + .tableId(table.getTableId()) + .checkpoint(checkpointJson) + .filesUploaded(Collections.singletonList(metadataJson.getFilename())) + .uploadedFiles( + Collections.singletonList( + UploadedFile.builder() + .name(metadataJson.getFilename()) + .lastModifiedAt(metadataJson.getLastModifiedAt().toEpochMilli()) + .build())) + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE) + .build(); + return onehouseApiClient + .upsertTableMetricsCheckpoint(request) + .thenApply( + upsert -> { + if (upsert.isFailure()) { + log.error( + "Failed to upsert checkpoint for Iceberg table {} (status {}): {}", + table.getTableId(), + upsert.getStatusCode(), + upsert.getCause()); + return false; + } + return true; + }); + } + + /** + * Picks the latest metadata.json from a {@code metadata/} listing. Both naming conventions in + * use today — {@code v{N}.metadata.json} (Hadoop catalog) and {@code 00000-.metadata.json} + * (Hive / Glue / Spark) — sort to the same answer lexicographically. + */ + static Optional pickLatestMetadataJson(List files) { + return files.stream() + .filter(f -> !f.isDirectory()) + .filter(f -> f.getFilename().endsWith(ICEBERG_METADATA_FILE_SUFFIX)) + .max((a, b) -> a.getFilename().compareTo(b.getFilename())); + } + + private static String deriveTableName(String basePathUri) { + String trimmed = basePathUri.endsWith("/") ? basePathUri.substring(0, basePathUri.length() - 1) : basePathUri; + int idx = trimmed.lastIndexOf('/'); + return idx < 0 ? trimmed : trimmed.substring(idx + 1); + } +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java new file mode 100644 index 00000000..94c17a80 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java @@ -0,0 +1,26 @@ +package ai.onehouse.metadata_extractor; + +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FOLDER_NAME; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.models.File; +import java.util.List; + +/** + * A directory is an Iceberg table root if it contains a {@code metadata/} folder. The folder + * itself holds the {@code *.metadata.json} pointer files plus manifest list / manifest avros; + * presence alone is sufficient to identify the table root since Iceberg always emits {@code + * metadata/} on the first commit. + */ +public class IcebergTableFormatDetector implements TableFormatDetector { + @Override + public TableFormat format() { + return TableFormat.ICEBERG; + } + + @Override + public boolean matches(List listedFiles) { + return listedFiles.stream() + .anyMatch(file -> file.isDirectory() && ICEBERG_METADATA_FOLDER_NAME.equals(file.getFilename())); + } +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java index 536f99fe..0c1e7e71 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java @@ -1,5 +1,6 @@ package ai.onehouse.metadata_extractor; +import ai.onehouse.api.models.request.TableFormat; import ai.onehouse.config.models.configv1.MetadataExtractorConfig; import ai.onehouse.constants.MetricsConstants; import ai.onehouse.storage.AsyncStorageClient; @@ -18,13 +19,17 @@ import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -34,6 +39,7 @@ public class TableDiscoveryAndUploadJob { private final TableDiscoveryService tableDiscoveryService; private final TableMetadataUploaderService tableMetadataUploaderService; + private final IcebergMetadataUploaderService icebergMetadataUploaderService; private final ScheduledExecutorService scheduler; private final Object lock = new Object(); private final LakeViewExtractorMetrics hudiMetadataExtractorMetrics; @@ -47,16 +53,41 @@ public class TableDiscoveryAndUploadJob { public TableDiscoveryAndUploadJob( @Nonnull TableDiscoveryService tableDiscoveryService, @Nonnull TableMetadataUploaderService tableMetadataUploaderService, + @Nonnull IcebergMetadataUploaderService icebergMetadataUploaderService, @Nonnull LakeViewExtractorMetrics hudiMetadataExtractorMetrics, @Nonnull @TableDiscoveryObjectStorageAsyncClient AsyncStorageClient asyncStorageClient) { this.scheduler = getScheduler(); this.tableDiscoveryService = tableDiscoveryService; this.tableMetadataUploaderService = tableMetadataUploaderService; + this.icebergMetadataUploaderService = icebergMetadataUploaderService; this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; this.firstCronRunStartTime = Instant.now(); this.asyncStorageClient = asyncStorageClient; } + /** + * Routes discovered tables to the appropriate per-format uploader and reduces to a single + * success boolean (all-or-nothing). Hudi and Iceberg tables are uploaded concurrently. + */ + private CompletableFuture dispatchUpload(Set
tables) { + Map> byFormat = + tables.stream() + .collect( + Collectors.groupingBy( + t -> t.getTableFormat() == null ? TableFormat.HUDI : t.getTableFormat(), + Collectors.toSet())); + CompletableFuture hudiFuture = + tableMetadataUploaderService.uploadInstantsInTables( + byFormat.getOrDefault(TableFormat.HUDI, Collections.emptySet())); + CompletableFuture icebergFuture = + icebergMetadataUploaderService.uploadInstantsInTables( + byFormat.getOrDefault(TableFormat.ICEBERG, Collections.emptySet())); + // Treat a null result the same as success — the legacy contract on the Hudi uploader was + // "throw to fail," and downstream callers only inspect exceptions, not the boolean. + return hudiFuture.thenCombine( + icebergFuture, (a, b) -> !Boolean.FALSE.equals(a) && !Boolean.FALSE.equals(b)); + } + /* * runs discovery and upload periodically at fixed intervals in a continuous fashion */ @@ -96,7 +127,7 @@ public void runOnce(Config config, int runCounter) { Boolean isSucceeded = tableDiscoveryService .discoverTables() - .thenCompose(tableMetadataUploaderService::uploadInstantsInTables) + .thenCompose(this::dispatchUpload) .join(); if (Boolean.TRUE.equals(isSucceeded)) { log.info("Run Completed"); @@ -178,8 +209,7 @@ private void processTables(Config config) { log.debug("Uploading table metadata for discovered tables"); hudiMetadataExtractorMetrics.resetTableProcessedGauge(); AtomicBoolean hasError = new AtomicBoolean(false); - tableMetadataUploaderService - .uploadInstantsInTables(tables) + dispatchUpload(tables) .exceptionally( ex -> { log.error("Error uploading instants in tables: ", ex); diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java index 707ed4af..95478410 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java @@ -1,10 +1,10 @@ package ai.onehouse.metadata_extractor; -import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; import static java.util.Collections.emptySet; import com.google.inject.Inject; +import ai.onehouse.api.models.request.TableFormat; import ai.onehouse.constants.MetricsConstants; import ai.onehouse.config.ConfigProvider; import ai.onehouse.config.models.configv1.Database; @@ -17,8 +17,10 @@ import ai.onehouse.storage.models.File; import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +32,9 @@ import org.apache.commons.lang3.tuple.Pair; /* - * Discovers hudi tables by Parsing all folders (including nested folders) in provided base paths - * excluded paths will be skipped. + * Discovers tables by Parsing all folders (including nested folders) in provided base paths. + * Each directory is run through the registered {@link TableFormatDetector}s; the first detector + * that matches determines the table's format. Excluded paths are skipped. */ @Slf4j public class TableDiscoveryService { @@ -41,6 +44,7 @@ public class TableDiscoveryService { private final ExecutorService executorService; private final ConfigProvider configProvider; private final LakeViewExtractorMetrics lakeviewExtractorMetrics; + private final List tableFormatDetectors; @Inject public TableDiscoveryService( @@ -48,12 +52,18 @@ public TableDiscoveryService( @Nonnull StorageUtils storageUtils, @Nonnull ConfigProvider configProvider, @Nonnull ExecutorService executorService, - @Nonnull LakeViewExtractorMetrics lakeviewExtractorMetrics) { + @Nonnull LakeViewExtractorMetrics lakeviewExtractorMetrics, + @Nonnull HudiTableFormatDetector hudiTableFormatDetector, + @Nonnull IcebergTableFormatDetector icebergTableFormatDetector) { this.asyncStorageClient = asyncStorageClient; this.storageUtils = storageUtils; this.executorService = executorService; this.configProvider = configProvider; this.lakeviewExtractorMetrics = lakeviewExtractorMetrics; + // Hudi listed first so existing tables short-circuit on the cheaper check; new formats append. + this.tableFormatDetectors = + Collections.unmodifiableList( + Arrays.asList(hudiTableFormatDetector, icebergTableFormatDetector)); } public CompletableFuture> discoverTables() { @@ -137,12 +147,14 @@ private CompletableFuture> discoverTablesInPath( Set
tablePaths = ConcurrentHashMap.newKeySet(); List> recursiveFutures = new ArrayList<>(); - if (isHudiTableFolder(listedFiles)) { + Optional detected = detectTableFormat(listedFiles); + if (detected.isPresent()) { Table table = Table.builder() .absoluteTableUri(path) .databaseName(databaseName) .lakeName(lakeName) + .tableFormat(detected.get()) .build(); if (!isExcluded(table.getAbsoluteTableUri(), excludedPathPatterns)) { tablePaths.add(table); @@ -183,12 +195,13 @@ private CompletableFuture> discoverTablesInPath( } } - /* - * checks the contents of a folder to see if it is a hudi table or not - * a folder is a hudi table if it contains .hoodie folder within it - */ - private static boolean isHudiTableFolder(List listedFiles) { - return listedFiles.stream().anyMatch(file -> file.getFilename().startsWith(HOODIE_FOLDER_NAME)); + private Optional detectTableFormat(List listedFiles) { + for (TableFormatDetector detector : tableFormatDetectors) { + if (detector.matches(listedFiles)) { + return Optional.of(detector.format()); + } + } + return Optional.empty(); } private boolean isExcluded(String filePath, List excludedPathPatterns) { diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java new file mode 100644 index 00000000..cfd9003e --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java @@ -0,0 +1,20 @@ +package ai.onehouse.metadata_extractor; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.models.File; +import java.util.List; + +/** + * Identifies whether a directory listing represents the root of a table of a particular format. + * + *

One implementation per supported format (Hudi, Iceberg, ...). {@link TableDiscoveryService} + * iterates registered detectors and stops at the first match, so detectors should be cheap and + * unambiguous. + */ +public interface TableFormatDetector { + /** Format this detector identifies. */ + TableFormat format(); + + /** Returns true if the listed files at a directory indicate a table root of {@link #format()}. */ + boolean matches(List listedFiles); +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java index ec918e69..5ee21cbc 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java @@ -3,6 +3,7 @@ import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT; import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT; +import ai.onehouse.api.models.request.TableFormat; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -20,4 +21,6 @@ public class Table { private String tableId; @Builder.Default private final int tableVersion = HOODIE_TABLE_VERSION_DEFAULT; @Builder.Default private final int timelineLayoutVersion = TIMELINE_LAYOUT_VERSION_DEFAULT; + // tableVersion / timelineLayoutVersion above are Hudi-specific and ignored for non-Hudi formats. + @Builder.Default private final TableFormat tableFormat = TableFormat.HUDI; } diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java index 7233b581..bbc25a8f 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java @@ -40,6 +40,7 @@ class TableDiscoveryAndUploadJobTest { @Mock private TableDiscoveryService mockTableDiscoveryService; @Mock private TableMetadataUploaderService mockTableMetadataUploaderService; + @Mock private IcebergMetadataUploaderService mockIcebergMetadataUploaderService; @Mock private ScheduledExecutorService mockScheduler; @@ -56,6 +57,12 @@ class TableDiscoveryAndUploadJobTest { @BeforeEach void setUp(TestInfo info) { + // Existing tests exercise the Hudi path; dispatch routes any Iceberg-format tables (none in + // these tests) to the Iceberg uploader. Default the mock to a successful no-op so the dispatch + // combine doesn't NPE on the (empty Iceberg set) branch. + lenient() + .when(mockIcebergMetadataUploaderService.uploadInstantsInTables(anySet())) + .thenReturn(CompletableFuture.completedFuture(true)); Instant fixedInstant = info.getDisplayName().startsWith("2023") ? Instant.parse(info.getDisplayName()) : Instant.now(); try (MockedStatic mockedInstant = @@ -65,6 +72,7 @@ void setUp(TestInfo info) { new TableDiscoveryAndUploadJob( mockTableDiscoveryService, mockTableMetadataUploaderService, + mockIcebergMetadataUploaderService, mockHudiMetadataExtractorMetrics, asyncStorageClient) { @Override diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java index 04c6e0ae..8bf976a2 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java @@ -164,7 +164,9 @@ void testDiscoverTablesWithExclusion() throws ExecutionException, InterruptedExc new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector()); Set

tableSet = tableDiscoveryService.discoverTables().get(); List
expectedResponseSet = @@ -256,7 +258,9 @@ void testCaseWhereMoreThanOneDiscoveredTablesForTableId() { new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector()); Set
discoveredTables = tableDiscoveryService.discoverTables().join(); assertEquals(emptySet(), discoveredTables); @@ -294,7 +298,9 @@ void testWithInvalidBasePath() { new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector()); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); } @@ -326,7 +332,9 @@ void testTableDiscoveryWithAsyncExceptions() { new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector()); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); } @@ -357,7 +365,9 @@ void testTableDiscoveryEncountersRateLimitException() { new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector()); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java new file mode 100644 index 00000000..d4aa0c77 --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java @@ -0,0 +1,46 @@ +package ai.onehouse.metadata_extractor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.models.File; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class TestHudiTableFormatDetector { + private final HudiTableFormatDetector detector = new HudiTableFormatDetector(); + + @Test + void declaresHudiFormat() { + assertEquals(TableFormat.HUDI, detector.format()); + } + + @Test + void matchesWhenHoodieFolderPresent() { + assertTrue( + detector.matches( + Arrays.asList( + file(".hoodie", true), + file("part-0.parquet", false)))); + } + + @Test + void doesNotMatchWhenAbsent() { + assertFalse(detector.matches(Collections.singletonList(file("part-0.parquet", false)))); + } + + @Test + void doesNotMatchOnIcebergLayout() { + assertFalse( + detector.matches( + Arrays.asList(file("metadata", true), file("data", true)))); + } + + private static File file(String name, boolean dir) { + return File.builder().filename(name).isDirectory(dir).lastModifiedAt(Instant.EPOCH).build(); + } +} diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java new file mode 100644 index 00000000..4725a1e7 --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java @@ -0,0 +1,66 @@ +package ai.onehouse.metadata_extractor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import ai.onehouse.storage.models.File; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +class TestIcebergMetadataUploaderService { + + @Test + void picksLexicographicallyLatestHiveStyleMetadataJson() { + Optional latest = + IcebergMetadataUploaderService.pickLatestMetadataJson( + Arrays.asList( + jsonFile("00000-abc.metadata.json"), + jsonFile("00020-xyz.metadata.json"), + jsonFile("00005-def.metadata.json"))); + assertTrue(latest.isPresent()); + assertEquals("00020-xyz.metadata.json", latest.get().getFilename()); + } + + @Test + void picksLatestHadoopCatalogMetadataJson() { + Optional latest = + IcebergMetadataUploaderService.pickLatestMetadataJson( + Arrays.asList( + jsonFile("v1.metadata.json"), + jsonFile("v2.metadata.json"), + jsonFile("v10.metadata.json"))); + assertTrue(latest.isPresent()); + // Hadoop catalog convention is zero-padded in practice; bare integers sort poorly but we still + // pick *a* result deterministically. Document the limitation rather than hide it. + assertEquals("v2.metadata.json", latest.get().getFilename()); + } + + @Test + void ignoresDirectoriesAndUnrelatedFiles() { + Optional latest = + IcebergMetadataUploaderService.pickLatestMetadataJson( + Arrays.asList( + File.builder().filename("snap-x.avro").isDirectory(false).lastModifiedAt(Instant.EPOCH).build(), + File.builder().filename("sub").isDirectory(true).lastModifiedAt(Instant.EPOCH).build(), + jsonFile("00001-a.metadata.json"))); + assertTrue(latest.isPresent()); + assertEquals("00001-a.metadata.json", latest.get().getFilename()); + } + + @Test + void returnsEmptyWhenNoMetadataJson() { + assertFalse( + IcebergMetadataUploaderService.pickLatestMetadataJson( + Collections.singletonList( + File.builder().filename("snap.avro").isDirectory(false).lastModifiedAt(Instant.EPOCH).build())) + .isPresent()); + } + + private static File jsonFile(String name) { + return File.builder().filename(name).isDirectory(false).lastModifiedAt(Instant.EPOCH).build(); + } +} diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java new file mode 100644 index 00000000..c16cd11e --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java @@ -0,0 +1,45 @@ +package ai.onehouse.metadata_extractor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.models.File; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class TestIcebergTableFormatDetector { + private final IcebergTableFormatDetector detector = new IcebergTableFormatDetector(); + + @Test + void declaresIcebergFormat() { + assertEquals(TableFormat.ICEBERG, detector.format()); + } + + @Test + void matchesWhenMetadataDirectoryPresent() { + assertTrue( + detector.matches( + Arrays.asList(file("metadata", true), file("data", true)))); + } + + @Test + void doesNotMatchOnFileNamedMetadata() { + // A non-directory entry named "metadata" must not be confused with the metadata/ folder. + assertFalse(detector.matches(Collections.singletonList(file("metadata", false)))); + } + + @Test + void doesNotMatchOnHudiLayout() { + assertFalse( + detector.matches( + Arrays.asList(file(".hoodie", true), file("part-0.parquet", false)))); + } + + private static File file(String name, boolean dir) { + return File.builder().filename(name).isDirectory(dir).lastModifiedAt(Instant.EPOCH).build(); + } +} From 943e47b02311ecfbb721d5f3adc56b7c0275bd14 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Tue, 12 May 2026 19:44:11 -0700 Subject: [PATCH 2/6] Fix lakeview-sync-tool compile: pass detectors + Iceberg uploader LakeviewSyncTool builds TableDiscoveryService and TableDiscoveryAndUploadJob manually (no Guice in the sync-tool entry path), so the constructor signature changes from the parent commit broke its compile. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ai/onehouse/lakeview/sync/LakeviewSyncTool.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java index 146285e3..5bfdeac6 100644 --- a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java +++ b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java @@ -14,6 +14,9 @@ import ai.onehouse.config.models.configv1.ParserConfig; import ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher; import ai.onehouse.metadata_extractor.HoodiePropertiesReader; +import ai.onehouse.metadata_extractor.HudiTableFormatDetector; +import ai.onehouse.metadata_extractor.IcebergMetadataUploaderService; +import ai.onehouse.metadata_extractor.IcebergTableFormatDetector; import ai.onehouse.metadata_extractor.LSMTimelineManifestReader; import ai.onehouse.metadata_extractor.TableDiscoveryAndUploadJob; import ai.onehouse.metadata_extractor.TableDiscoveryService; @@ -253,7 +256,8 @@ private TableDiscoveryAndUploadJob getTableDiscoveryAndUploadJob(@Nonnull Config configProvider); TableDiscoveryService tableDiscoveryService = new TableDiscoveryService(asyncStorageClient, storageUtils, - configProvider, executorService, lakeViewExtractorMetrics); + configProvider, executorService, lakeViewExtractorMetrics, + new HudiTableFormatDetector(), new IcebergTableFormatDetector()); HoodiePropertiesReader hoodiePropertiesReader = new HoodiePropertiesReader(asyncStorageClient, lakeViewExtractorMetrics); OnehouseApiClient onehouseApiClient = new OnehouseApiClient(asyncHttpClientWithRetry, config, @@ -267,8 +271,11 @@ presignedUrlFileUploader, onehouseApiClient, storageUtils, executorService, new lakeViewExtractorMetrics, config, lsmTimelineManifestReader); TableMetadataUploaderService tableMetadataUploaderService = new TableMetadataUploaderService(hoodiePropertiesReader, onehouseApiClient, timelineCommitInstantsUploader, lakeViewExtractorMetrics, executorService); + IcebergMetadataUploaderService icebergMetadataUploaderService = new IcebergMetadataUploaderService( + asyncStorageClient, onehouseApiClient, presignedUrlFileUploader, storageUtils, lakeViewExtractorMetrics); - return new TableDiscoveryAndUploadJob(tableDiscoveryService, tableMetadataUploaderService, lakeViewExtractorMetrics, asyncStorageClient); + return new TableDiscoveryAndUploadJob(tableDiscoveryService, tableMetadataUploaderService, + icebergMetadataUploaderService, lakeViewExtractorMetrics, asyncStorageClient); } private AsyncStorageClient getAsyncStorageClient(@Nonnull Config config, @Nonnull ExecutorService executorService, From 0181eb9182cec270f6ae233036d597ce6d7da2f6 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Wed, 13 May 2026 11:36:18 -0700 Subject: [PATCH 3/6] Consume metadata_location hints from parser YAML MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the LakeView half of the fast-path that lets us skip the per-cycle S3 LIST on Iceberg tables when the control plane already knows the current metadata.json URI (e.g. from AWS Glue's metadata_location parameter on the catalog entry). - New TableHint POJO and Database.tableHints map (keyed on tableId). Older YAML versions don't carry this field; Jackson leaves it null and discovery falls through to the existing listing-based path. - Table model gains metadataLocationHint (optional). - TableDiscoveryService merges per-database tableHints into a single tableId -> hint map and attaches metadataLocationHint to each Table it discovers, when the tableId matches a hint. - IcebergMetadataUploaderService.uploadIfNewMetadataJson branches on the hint: if set, derive the filename, compare against the checkpoint, and PUT directly to the hint URI. If the checkpoint already matches, no-op without touching S3 at all. Falls back to the existing LIST behavior when no hint is provided. The control-plane producer (gw-agent MetricsExtractorFileUpdater) does not yet emit the new field into the YAML — that needs a lakeview-config artifact version bump on the gateway-controller side. Once that lands, the fast-path activates automatically; until then, tableHints stays null and behavior is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../config/models/configv1/Database.java | 10 ++++++ .../config/models/configv1/TableHint.java | 24 +++++++++++++ .../IcebergMetadataUploaderService.java | 36 +++++++++++++++++-- .../TableDiscoveryService.java | 19 +++++++++- .../metadata_extractor/models/Table.java | 7 ++++ .../TestIcebergMetadataUploaderService.java | 17 +++++++++ 6 files changed, 109 insertions(+), 4 deletions(-) create mode 100644 lakeview/src/main/java/ai/onehouse/config/models/configv1/TableHint.java diff --git a/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java b/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java index bd119d68..bf160f58 100644 --- a/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java +++ b/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java @@ -1,6 +1,8 @@ package ai.onehouse.config.models.configv1; import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; import lombok.Builder; import lombok.NonNull; import lombok.Value; @@ -12,4 +14,12 @@ public class Database { String name; @NonNull List basePaths; + /** + * Optional per-table hints keyed on the tableId encoded in {@link #basePaths} (the segment after + * {@code #}). The control plane populates this for tables it has richer information about + * (e.g. Iceberg tables discovered via AWS Glue, where the catalog already knows the current + * {@code metadata.json} URI). Missing entries / missing map are normal — older YAML versions + * predate this field, and tables without hints fall back to plain discovery. + */ + @Nullable Map tableHints; } diff --git a/lakeview/src/main/java/ai/onehouse/config/models/configv1/TableHint.java b/lakeview/src/main/java/ai/onehouse/config/models/configv1/TableHint.java new file mode 100644 index 00000000..b47bcb63 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/config/models/configv1/TableHint.java @@ -0,0 +1,24 @@ +package ai.onehouse.config.models.configv1; + +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +/** + * Optional per-table hints supplied by the control plane in the parser YAML, keyed on tableId in + * {@link Database#getTableHints()}. Hints are forward-compatible: older LakeView versions that + * don't know about a field deserialize the rest fine; new fields default to null. + */ +@Builder +@Value +@Jacksonized +public class TableHint { + /** + * For Iceberg tables registered in an external catalog (e.g. AWS Glue), the URI of the current + * {@code metadata.json} that the catalog points at. When present, the extractor can PUT this + * file directly instead of listing the {@code metadata/} folder to find the latest. Empty / + * null means "no hint — fall back to listing". + */ + @Nullable String metadataLocationHint; +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java index 5bce527e..adbca419 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java @@ -186,6 +186,30 @@ private CompletableFuture initialiseTable(Table table) { } private CompletableFuture uploadIfNewMetadataJson(Table table, Checkpoint checkpoint) { + // Fast path: control plane provided the current metadata.json URI (e.g. from AWS Glue's + // metadata_location parameter). Skip the per-cycle LIST and PUT the file directly. + if (StringUtils.isNotBlank(table.getMetadataLocationHint())) { + String hint = table.getMetadataLocationHint(); + String filename = lastPathSegment(hint); + if (filename.equals(checkpoint.getLastUploadedFile())) { + log.debug( + "Iceberg table {} already up-to-date at {} (from hint)", + table.getTableId(), + filename); + return CompletableFuture.completedFuture(true); + } + File synthetic = + File.builder() + .filename(filename) + .isDirectory(false) + // Hint doesn't carry lastModifiedAt; use now() since it's only used for ordering + // on the consumer side and is not load-bearing for correctness. + .lastModifiedAt(Instant.now()) + .build(); + return uploadAndAdvanceCheckpoint(table, hint, synthetic, checkpoint); + } + + // Fallback path: list metadata/ and lex-sort. String metadataDirUri = storageUtils.constructFileUri(table.getAbsoluteTableUri(), ICEBERG_METADATA_FOLDER_NAME); return asyncStorageClient @@ -207,13 +231,14 @@ private CompletableFuture uploadIfNewMetadataJson(Table table, Checkpoi latest.get().getFilename()); return CompletableFuture.completedFuture(true); } - return uploadAndAdvanceCheckpoint(table, metadataDirUri, latest.get(), checkpoint); + String fileUri = + storageUtils.constructFileUri(metadataDirUri, latest.get().getFilename()); + return uploadAndAdvanceCheckpoint(table, fileUri, latest.get(), checkpoint); }); } private CompletableFuture uploadAndAdvanceCheckpoint( - Table table, String metadataDirUri, File metadataJson, Checkpoint priorCheckpoint) { - String fileUri = storageUtils.constructFileUri(metadataDirUri, metadataJson.getFilename()); + Table table, String fileUri, File metadataJson, Checkpoint priorCheckpoint) { return onehouseApiClient .generateCommitMetadataUploadUrl( GenerateCommitMetadataUploadUrlRequest.builder() @@ -303,6 +328,11 @@ static Optional pickLatestMetadataJson(List files) { .max((a, b) -> a.getFilename().compareTo(b.getFilename())); } + static String lastPathSegment(String uri) { + int idx = uri.lastIndexOf('/'); + return idx < 0 ? uri : uri.substring(idx + 1); + } + private static String deriveTableName(String basePathUri) { String trimmed = basePathUri.endsWith("/") ? basePathUri.substring(0, basePathUri.length() - 1) : basePathUri; int idx = trimmed.lastIndexOf('/'); diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java index 95478410..ea16199d 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java @@ -10,6 +10,7 @@ import ai.onehouse.config.models.configv1.Database; import ai.onehouse.config.models.configv1.MetadataExtractorConfig; import ai.onehouse.config.models.configv1.ParserConfig; +import ai.onehouse.config.models.configv1.TableHint; import ai.onehouse.metadata_extractor.models.Table; import ai.onehouse.metrics.LakeViewExtractorMetrics; import ai.onehouse.storage.AsyncStorageClient; @@ -74,6 +75,17 @@ public CompletableFuture> discoverTables() { log.info("Starting table discover service, excluding {}", excludedPathPatterns); List>>> pathToDiscoveredTablesFuturePairList = new ArrayList<>(); + // Merge per-database tableHints into a single tableId -> hint map. Hints are + // optional metadata supplied by the control plane (e.g. Iceberg metadata_location) + // and are looked up after discovery, once the tableId is known. + java.util.Map tableHintsByTableId = new java.util.HashMap<>(); + for (ParserConfig parserConfig : metadataExtractorConfig.getParserConfig()) { + for (Database database : parserConfig.getDatabases()) { + if (database.getTableHints() != null) { + tableHintsByTableId.putAll(database.getTableHints()); + } + } + } for (ParserConfig parserConfig : metadataExtractorConfig.getParserConfig()) { for (Database database : parserConfig.getDatabases()) { @@ -116,7 +128,12 @@ public CompletableFuture> discoverTables() { continue; } Table table = discoveredTables.iterator().next(); - table = table.toBuilder().tableId(tableId).build(); + Table.TableBuilder builder = table.toBuilder().tableId(tableId); + TableHint hint = tableHintsByTableId.get(tableId); + if (hint != null && StringUtils.isNotBlank(hint.getMetadataLocationHint())) { + builder.metadataLocationHint(hint.getMetadataLocationHint()); + } + table = builder.build(); discoveredTables = Collections.singleton(table); } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java index 5ee21cbc..4c6b1bcb 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java @@ -23,4 +23,11 @@ public class Table { @Builder.Default private final int timelineLayoutVersion = TIMELINE_LAYOUT_VERSION_DEFAULT; // tableVersion / timelineLayoutVersion above are Hudi-specific and ignored for non-Hudi formats. @Builder.Default private final TableFormat tableFormat = TableFormat.HUDI; + /** + * Optional URI of the current Iceberg {@code metadata.json}, supplied by the control plane via + * the parser YAML's table hints. Null when no hint was provided. {@link + * ai.onehouse.metadata_extractor.IcebergMetadataUploaderService} uses this to skip the per-cycle + * {@code metadata/} folder LIST when the hint matches the last uploaded file. + */ + private final String metadataLocationHint; } diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java index 4725a1e7..3ed24bde 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java @@ -60,6 +60,23 @@ void returnsEmptyWhenNoMetadataJson() { .isPresent()); } + @Test + void lastPathSegmentExtractsFilenameFromUri() { + assertEquals( + "00042-uuid.metadata.json", + IcebergMetadataUploaderService.lastPathSegment( + "s3://bucket/db/t/metadata/00042-uuid.metadata.json")); + assertEquals( + "v3.metadata.json", + IcebergMetadataUploaderService.lastPathSegment( + "s3a://bucket/db/t/metadata/v3.metadata.json")); + } + + @Test + void lastPathSegmentReturnsInputWhenNoSlash() { + assertEquals("filename.json", IcebergMetadataUploaderService.lastPathSegment("filename.json")); + } + private static File jsonFile(String name) { return File.builder().filename(name).isDirectory(false).lastModifiedAt(Instant.EPOCH).build(); } From 383b3321f457038f481492db86f2fb3299cee626 Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Wed, 13 May 2026 13:25:41 -0700 Subject: [PATCH 4/6] Make Iceberg discovery declarative and fix latest-metadata.json selection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Concern 2: the previous IcebergTableFormatDetector matched any directory containing a sub-directory named "metadata", which produced false positives across customer warehouses (Spark checkpoint dirs, custom layouts, schema folders, etc.). Replace the SPI ordering with per-Database declarative routing — each Database in the parser YAML now declares its tableFormat (default HUDI for backward compat), and TableDiscoveryService picks the single matching detector for that database. The Iceberg detector also becomes strict: requires metadata/ AND at least one *.metadata.json inside it (one extra LIST during discovery only, not per upload cycle). Concern 1: pickLatestMetadataJson lex-sorted filenames, so for the Hadoop catalog naming v{N}.metadata.json (unpadded), v2.metadata.json beat v10.metadata.json — a stale pointer. Resolve in three tiers: catalog hint, then metadata/version-hint.text when present (canonical Iceberg lookup), then numeric-aware sort on the leading integer. Empty metadata/ now increments a NO_SUCH_KEY failure counter instead of silently returning true, so phantom tables surface in dashboards. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lakeview/sync/LakeviewSyncTool.java | 3 +- .../config/models/configv1/Database.java | 8 + .../constants/MetadataExtractorConstants.java | 2 + .../HudiTableFormatDetector.java | 6 +- .../IcebergMetadataUploaderService.java | 167 +++++++++++++++--- .../IcebergTableFormatDetector.java | 55 +++++- .../TableDiscoveryService.java | 166 +++++++++-------- .../TableFormatDetector.java | 21 ++- .../TableDiscoveryServiceTest.java | 76 +++++++- .../TestHudiTableFormatDetector.java | 20 ++- .../TestIcebergMetadataUploaderService.java | 57 +++++- .../TestIcebergTableFormatDetector.java | 62 ++++++- 12 files changed, 506 insertions(+), 137 deletions(-) diff --git a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java index 5bfdeac6..912f56ad 100644 --- a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java +++ b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java @@ -257,7 +257,8 @@ private TableDiscoveryAndUploadJob getTableDiscoveryAndUploadJob(@Nonnull Config TableDiscoveryService tableDiscoveryService = new TableDiscoveryService(asyncStorageClient, storageUtils, configProvider, executorService, lakeViewExtractorMetrics, - new HudiTableFormatDetector(), new IcebergTableFormatDetector()); + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(asyncStorageClient, storageUtils)); HoodiePropertiesReader hoodiePropertiesReader = new HoodiePropertiesReader(asyncStorageClient, lakeViewExtractorMetrics); OnehouseApiClient onehouseApiClient = new OnehouseApiClient(asyncHttpClientWithRetry, config, diff --git a/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java b/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java index bf160f58..14fac7fd 100644 --- a/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java +++ b/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java @@ -1,5 +1,6 @@ package ai.onehouse.config.models.configv1; +import ai.onehouse.api.models.request.TableFormat; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -14,6 +15,13 @@ public class Database { String name; @NonNull List basePaths; + /** + * Physical table format of the tables under {@link #basePaths}. Null is interpreted as {@link + * TableFormat#HUDI} for backward compatibility with YAMLs written before Iceberg support + * existed. Customers with a mixed-format warehouse should split into separate {@link Database} + * entries, one per format. + */ + @Nullable TableFormat tableFormat; /** * Optional per-table hints keyed on the tableId encoded in {@link #basePaths} (the segment after * {@code #}). The control plane populates this for tables it has richer information about diff --git a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java index e752200a..f4e06b87 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java @@ -16,6 +16,8 @@ private MetadataExtractorConstants() {} public static final String ARCHIVED_FOLDER_NAME = "archived"; public static final String ICEBERG_METADATA_FOLDER_NAME = "metadata"; public static final String ICEBERG_METADATA_FILE_SUFFIX = ".metadata.json"; + public static final String ICEBERG_VERSION_HINT_FILENAME = "version-hint.text"; + public static final String ICEBERG_HADOOP_METADATA_FILE_PREFIX = "v"; public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name"; public static final String HOODIE_TABLE_TYPE_KEY = "hoodie.table.type"; diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java index f11e8a72..9b316516 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java @@ -5,6 +5,7 @@ import ai.onehouse.api.models.request.TableFormat; import ai.onehouse.storage.models.File; import java.util.List; +import java.util.concurrent.CompletableFuture; /** A directory is a Hudi table root if it contains a {@code .hoodie/} folder. */ public class HudiTableFormatDetector implements TableFormatDetector { @@ -14,7 +15,8 @@ public TableFormat format() { } @Override - public boolean matches(List listedFiles) { - return listedFiles.stream().anyMatch(file -> file.getFilename().startsWith(HOODIE_FOLDER_NAME)); + public CompletableFuture matches(String path, List listedFiles) { + return CompletableFuture.completedFuture( + listedFiles.stream().anyMatch(file -> file.getFilename().startsWith(HOODIE_FOLDER_NAME))); } } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java index adbca419..605fb19d 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java @@ -1,11 +1,14 @@ package ai.onehouse.metadata_extractor; import static ai.onehouse.constants.MetadataExtractorConstants.DEFAULT_FILE_UPLOAD_STREAM_BATCH_SIZE; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_HADOOP_METADATA_FILE_PREFIX; import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FILE_SUFFIX; import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FOLDER_NAME; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_VERSION_HINT_FILENAME; import static ai.onehouse.constants.MetadataExtractorConstants.INITIAL_CHECKPOINT; import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; +import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; import ai.onehouse.api.OnehouseApiClient; import ai.onehouse.api.models.request.CommitTimelineType; import ai.onehouse.api.models.request.GenerateCommitMetadataUploadUrlRequest; @@ -15,7 +18,6 @@ import ai.onehouse.api.models.request.TableType; import ai.onehouse.api.models.request.UploadedFile; import ai.onehouse.api.models.request.UpsertTableMetricsCheckpointRequest; -import ai.onehouse.api.models.response.GenerateCommitMetadataUploadUrlResponse; import ai.onehouse.api.models.response.GetTableMetricsCheckpointResponse; import ai.onehouse.constants.MetricsConstants; import ai.onehouse.metadata_extractor.models.Checkpoint; @@ -25,13 +27,14 @@ import ai.onehouse.storage.PresignedUrlFileUploader; import ai.onehouse.storage.StorageUtils; import ai.onehouse.storage.models.File; -import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.inject.Inject; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.Set; @@ -55,6 +58,17 @@ * manifest plumbing don't apply, so a separate service is clearer than a branching megaclass. * Shared primitives ({@link OnehouseApiClient}, {@link PresignedUrlFileUploader}, {@link * AsyncStorageClient}) are reused. + * + *

Selecting the current pointer. Three paths, in order of preference: + *

    + *
  1. {@link Table#getMetadataLocationHint()} from the parser YAML (catalog-driven).
  2. + *
  3. {@code metadata/version-hint.text} for Hadoop-catalog tables — its integer content names + * the current {@code v{N}.metadata.json} unambiguously.
  4. + *
  5. Numeric-aware filename comparison as a last resort. The leading integer in the filename + * (after an optional {@code v} prefix) is treated as the version number. Works for both + * {@code v{N}.metadata.json} (Hadoop) and {@code 00000-.metadata.json} + * (Hive/Glue/Spark) without depending on zero-padding.
  6. + *
*/ @Slf4j public class IcebergMetadataUploaderService { @@ -202,41 +216,100 @@ private CompletableFuture uploadIfNewMetadataJson(Table table, Checkpoi File.builder() .filename(filename) .isDirectory(false) - // Hint doesn't carry lastModifiedAt; use now() since it's only used for ordering - // on the consumer side and is not load-bearing for correctness. + // Hint doesn't carry lastModifiedAt; use now() since the consumer treats this as + // advisory and discriminates by checkpoint batchId for ordering. .lastModifiedAt(Instant.now()) .build(); return uploadAndAdvanceCheckpoint(table, hint, synthetic, checkpoint); } - // Fallback path: list metadata/ and lex-sort. + // Fallback path: list metadata/, prefer version-hint.text, else numeric-aware sort. String metadataDirUri = storageUtils.constructFileUri(table.getAbsoluteTableUri(), ICEBERG_METADATA_FOLDER_NAME); return asyncStorageClient .listAllFilesInDir(metadataDirUri) .thenComposeAsync( - files -> { - Optional latest = pickLatestMetadataJson(files); - if (!latest.isPresent()) { - log.warn( - "Iceberg table {} has no metadata.json under {}", - table.getTableId(), - metadataDirUri); - return CompletableFuture.completedFuture(true); - } - if (latest.get().getFilename().equals(checkpoint.getLastUploadedFile())) { - log.debug( - "Iceberg table {} already up-to-date at {}", - table.getTableId(), - latest.get().getFilename()); - return CompletableFuture.completedFuture(true); + files -> + resolveLatestMetadataJson(metadataDirUri, files) + .thenComposeAsync( + latestOpt -> { + if (!latestOpt.isPresent()) { + log.error( + "Iceberg table {} has no *.metadata.json under {}", + table.getTableId(), + metadataDirUri); + metrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.NO_SUCH_KEY, + "Iceberg table missing metadata.json files"); + return CompletableFuture.completedFuture(false); + } + File latest = latestOpt.get(); + if (latest.getFilename().equals(checkpoint.getLastUploadedFile())) { + log.debug( + "Iceberg table {} already up-to-date at {}", + table.getTableId(), + latest.getFilename()); + return CompletableFuture.completedFuture(true); + } + String fileUri = + storageUtils.constructFileUri(metadataDirUri, latest.getFilename()); + return uploadAndAdvanceCheckpoint(table, fileUri, latest, checkpoint); + })); + } + + /** + * Resolves the current {@code metadata.json} given a listing of {@code metadata/}. Reads + * {@code version-hint.text} when present and falls back to numeric-aware filename comparison + * otherwise. + */ + private CompletableFuture> resolveLatestMetadataJson( + String metadataDirUri, List files) { + Optional versionHintFile = + files.stream() + .filter(f -> !f.isDirectory() && ICEBERG_VERSION_HINT_FILENAME.equals(f.getFilename())) + .findFirst(); + if (!versionHintFile.isPresent()) { + return CompletableFuture.completedFuture(pickLatestMetadataJson(files)); + } + String hintUri = storageUtils.constructFileUri(metadataDirUri, ICEBERG_VERSION_HINT_FILENAME); + return asyncStorageClient + .readFileAsBytes(hintUri) + .thenApply( + bytes -> { + Optional fromHint = resolveFromVersionHint(bytes, files); + if (fromHint.isPresent()) { + return fromHint; } - String fileUri = - storageUtils.constructFileUri(metadataDirUri, latest.get().getFilename()); - return uploadAndAdvanceCheckpoint(table, fileUri, latest.get(), checkpoint); + log.warn( + "version-hint.text at {} did not point at an existing metadata.json; falling back to numeric sort", + hintUri); + return pickLatestMetadataJson(files); + }) + .exceptionally( + ex -> { + log.warn( + "Failed to read version-hint.text at {}; falling back to numeric sort", + hintUri, + ex); + return pickLatestMetadataJson(files); }); } + static Optional resolveFromVersionHint(byte[] versionHintBytes, List files) { + String content = new String(versionHintBytes, StandardCharsets.UTF_8).trim(); + long version; + try { + version = Long.parseLong(content); + } catch (NumberFormatException e) { + log.warn("version-hint.text contained non-integer content: '{}'", content); + return Optional.empty(); + } + String target = ICEBERG_HADOOP_METADATA_FILE_PREFIX + version + ICEBERG_METADATA_FILE_SUFFIX; + return files.stream() + .filter(f -> !f.isDirectory() && target.equals(f.getFilename())) + .findFirst(); + } + private CompletableFuture uploadAndAdvanceCheckpoint( Table table, String fileUri, File metadataJson, Checkpoint priorCheckpoint) { return onehouseApiClient @@ -317,15 +390,55 @@ private CompletableFuture advanceCheckpoint( } /** - * Picks the latest metadata.json from a {@code metadata/} listing. Both naming conventions in - * use today — {@code v{N}.metadata.json} (Hadoop catalog) and {@code 00000-.metadata.json} - * (Hive / Glue / Spark) — sort to the same answer lexicographically. + * Picks the latest {@code *.metadata.json} from a {@code metadata/} listing using numeric-aware + * comparison. The leading integer in the filename (after an optional {@code v} prefix) is the + * version number; ties fall back to lexicographic order on the full filename so the result is + * deterministic. + * + *

Examples: + *

    + *
  • {@code v1.metadata.json}, {@code v10.metadata.json}, {@code v2.metadata.json} → + * {@code v10.metadata.json}
  • + *
  • {@code 00001-a.metadata.json}, {@code 00010-b.metadata.json} → + * {@code 00010-b.metadata.json}
  • + *
*/ static Optional pickLatestMetadataJson(List files) { + Comparator byVersionThenName = + Comparator.comparingLong(f -> extractVersionNumber(f.getFilename())) + .thenComparing(File::getFilename); return files.stream() .filter(f -> !f.isDirectory()) .filter(f -> f.getFilename().endsWith(ICEBERG_METADATA_FILE_SUFFIX)) - .max((a, b) -> a.getFilename().compareTo(b.getFilename())); + .max(byVersionThenName); + } + + /** + * Extracts the leading integer "version number" from an Iceberg metadata-json filename. Returns + * {@code -1} when no integer prefix is present (the file still participates in ordering — it + * just loses any tiebreak with numerically-named siblings). + */ + static long extractVersionNumber(String filename) { + String base = + filename.endsWith(ICEBERG_METADATA_FILE_SUFFIX) + ? filename.substring(0, filename.length() - ICEBERG_METADATA_FILE_SUFFIX.length()) + : filename; + int i = 0; + if (i < base.length() && base.charAt(i) == 'v') { + i++; + } + int start = i; + while (i < base.length() && Character.isDigit(base.charAt(i))) { + i++; + } + if (i == start) { + return -1L; + } + try { + return Long.parseLong(base.substring(start, i)); + } catch (NumberFormatException e) { + return -1L; + } } static String lastPathSegment(String uri) { diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java index 94c17a80..8e6e2ee2 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java @@ -1,26 +1,67 @@ package ai.onehouse.metadata_extractor; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FILE_SUFFIX; import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FOLDER_NAME; +import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.StorageUtils; import ai.onehouse.storage.models.File; +import com.google.inject.Inject; import java.util.List; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; /** - * A directory is an Iceberg table root if it contains a {@code metadata/} folder. The folder - * itself holds the {@code *.metadata.json} pointer files plus manifest list / manifest avros; - * presence alone is sufficient to identify the table root since Iceberg always emits {@code - * metadata/} on the first commit. + * A directory is an Iceberg table root if it contains a {@code metadata/} sub-directory and + * that sub-directory contains at least one {@code *.metadata.json} pointer file. The pointer-file + * check is what separates a real Iceberg table from any arbitrary folder that happens to have a + * sub-directory named {@code metadata} (a Spark checkpoint dir, a documentation folder, custom + * user layouts, etc.) — that name alone is too generic to be a reliable marker. + * + *

The validating LIST costs one extra storage call per candidate directory during discovery + * (not per upload cycle). Discovery already short-circuits recursion on a match, so the extra + * LIST runs at most once per real table and once per false-positive candidate during a discovery + * pass. */ public class IcebergTableFormatDetector implements TableFormatDetector { + private final AsyncStorageClient asyncStorageClient; + private final StorageUtils storageUtils; + + @Inject + public IcebergTableFormatDetector( + @Nonnull @TableDiscoveryObjectStorageAsyncClient AsyncStorageClient asyncStorageClient, + @Nonnull StorageUtils storageUtils) { + this.asyncStorageClient = asyncStorageClient; + this.storageUtils = storageUtils; + } + @Override public TableFormat format() { return TableFormat.ICEBERG; } @Override - public boolean matches(List listedFiles) { - return listedFiles.stream() - .anyMatch(file -> file.isDirectory() && ICEBERG_METADATA_FOLDER_NAME.equals(file.getFilename())); + public CompletableFuture matches(String path, List listedFiles) { + boolean hasMetadataDir = + listedFiles.stream() + .anyMatch( + file -> + file.isDirectory() + && ICEBERG_METADATA_FOLDER_NAME.equals(file.getFilename())); + if (!hasMetadataDir) { + return CompletableFuture.completedFuture(false); + } + String metadataDirUri = storageUtils.constructFileUri(path, ICEBERG_METADATA_FOLDER_NAME); + return asyncStorageClient + .listAllFilesInDir(metadataDirUri) + .thenApply( + metadataFiles -> + metadataFiles.stream() + .anyMatch( + f -> + !f.isDirectory() + && f.getFilename().endsWith(ICEBERG_METADATA_FILE_SUFFIX))); } } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java index ea16199d..f5f557ea 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java @@ -3,25 +3,26 @@ import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; import static java.util.Collections.emptySet; -import com.google.inject.Inject; +import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; import ai.onehouse.api.models.request.TableFormat; -import ai.onehouse.constants.MetricsConstants; import ai.onehouse.config.ConfigProvider; import ai.onehouse.config.models.configv1.Database; import ai.onehouse.config.models.configv1.MetadataExtractorConfig; import ai.onehouse.config.models.configv1.ParserConfig; import ai.onehouse.config.models.configv1.TableHint; +import ai.onehouse.constants.MetricsConstants; import ai.onehouse.metadata_extractor.models.Table; import ai.onehouse.metrics.LakeViewExtractorMetrics; import ai.onehouse.storage.AsyncStorageClient; import ai.onehouse.storage.StorageUtils; import ai.onehouse.storage.models.File; -import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.Optional; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -33,9 +34,10 @@ import org.apache.commons.lang3.tuple.Pair; /* - * Discovers tables by Parsing all folders (including nested folders) in provided base paths. - * Each directory is run through the registered {@link TableFormatDetector}s; the first detector - * that matches determines the table's format. Excluded paths are skipped. + * Discovers tables by walking all folders (including nested folders) in provided base paths. + * Each {@link Database} in the parser YAML declares its {@code tableFormat}; this service picks + * the single {@link TableFormatDetector} matching that format and applies it to every directory + * under the database's base paths. Excluded paths are skipped. */ @Slf4j public class TableDiscoveryService { @@ -45,7 +47,7 @@ public class TableDiscoveryService { private final ExecutorService executorService; private final ConfigProvider configProvider; private final LakeViewExtractorMetrics lakeviewExtractorMetrics; - private final List tableFormatDetectors; + private final Map detectorsByFormat; @Inject public TableDiscoveryService( @@ -61,10 +63,10 @@ public TableDiscoveryService( this.executorService = executorService; this.configProvider = configProvider; this.lakeviewExtractorMetrics = lakeviewExtractorMetrics; - // Hudi listed first so existing tables short-circuit on the cheaper check; new formats append. - this.tableFormatDetectors = - Collections.unmodifiableList( - Arrays.asList(hudiTableFormatDetector, icebergTableFormatDetector)); + this.detectorsByFormat = + ImmutableMap.of( + TableFormat.HUDI, hudiTableFormatDetector, + TableFormat.ICEBERG, icebergTableFormatDetector); } public CompletableFuture> discoverTables() { @@ -75,20 +77,29 @@ public CompletableFuture> discoverTables() { log.info("Starting table discover service, excluding {}", excludedPathPatterns); List>>> pathToDiscoveredTablesFuturePairList = new ArrayList<>(); - // Merge per-database tableHints into a single tableId -> hint map. Hints are - // optional metadata supplied by the control plane (e.g. Iceberg metadata_location) - // and are looked up after discovery, once the tableId is known. - java.util.Map tableHintsByTableId = new java.util.HashMap<>(); + // Merge per-database tableHints into a single tableId -> hint map. Hints are optional metadata + // supplied by the control plane (e.g. Iceberg metadata_location) and are looked up after + // discovery, once the tableId is known. + Map tableHintsByTableId = new HashMap<>(); for (ParserConfig parserConfig : metadataExtractorConfig.getParserConfig()) { for (Database database : parserConfig.getDatabases()) { - if (database.getTableHints() != null) { - tableHintsByTableId.putAll(database.getTableHints()); + if (database.getTableHints() == null) { + continue; + } + for (Map.Entry entry : database.getTableHints().entrySet()) { + TableHint previous = tableHintsByTableId.put(entry.getKey(), entry.getValue()); + if (previous != null) { + log.warn( + "Duplicate tableHint for tableId {} across databases; later entry wins.", + entry.getKey()); + } } } } for (ParserConfig parserConfig : metadataExtractorConfig.getParserConfig()) { for (Database database : parserConfig.getDatabases()) { + TableFormatDetector detector = detectorFor(database); for (String basePathConfig : database.getBasePaths()) { String basePath = extractBasePath(basePathConfig); @@ -100,7 +111,11 @@ public CompletableFuture> discoverTables() { Pair.of( basePathConfig, discoverTablesInPath( - basePath, parserConfig.getLake(), database.getName(), excludedPathPatterns))); + basePath, + parserConfig.getLake(), + database.getName(), + excludedPathPatterns, + detector))); } } } @@ -143,6 +158,16 @@ public CompletableFuture> discoverTables() { }); } + private TableFormatDetector detectorFor(Database database) { + TableFormat declared = database.getTableFormat() == null ? TableFormat.HUDI : database.getTableFormat(); + TableFormatDetector detector = detectorsByFormat.get(declared); + if (detector == null) { + // Programmer error: a new TableFormat enum value was added without a matching detector. + throw new IllegalStateException("No detector registered for tableFormat=" + declared); + } + return detector; + } + private String extractBasePath(String basePathConfig) { String[] basePathConfigParts = basePathConfig.split(TABLE_ID_SEPARATOR); return basePathConfigParts[0]; @@ -154,56 +179,68 @@ private String extractTableId(String basePathConfig) { } private CompletableFuture> discoverTablesInPath( - String path, String lakeName, String databaseName, List excludedPathPatterns) { + String path, + String lakeName, + String databaseName, + List excludedPathPatterns, + TableFormatDetector detector) { try { log.info(String.format("Discovering tables in %s", path)); return asyncStorageClient .listAllFilesInDir(path) .thenComposeAsync( - listedFiles -> { - Set

tablePaths = ConcurrentHashMap.newKeySet(); - List> recursiveFutures = new ArrayList<>(); - - Optional detected = detectTableFormat(listedFiles); - if (detected.isPresent()) { - Table table = - Table.builder() - .absoluteTableUri(path) - .databaseName(databaseName) - .lakeName(lakeName) - .tableFormat(detected.get()) - .build(); - if (!isExcluded(table.getAbsoluteTableUri(), excludedPathPatterns)) { - tablePaths.add(table); - } - return CompletableFuture.completedFuture(tablePaths); - } - - List directories = - listedFiles.stream().filter(File::isDirectory).collect(Collectors.toList()); - - for (File file : directories) { - String filePath = storageUtils.constructFileUri(path, file.getFilename()); - if (!isExcluded(filePath, excludedPathPatterns)) { - CompletableFuture recursiveFuture = - discoverTablesInPath(filePath, lakeName, databaseName, excludedPathPatterns) - .thenAccept(tablePaths::addAll); - recursiveFutures.add(recursiveFuture); - } - } - - return CompletableFuture.allOf(recursiveFutures.toArray(new CompletableFuture[0])) - .thenApplyAsync(ignored -> tablePaths, executorService); - }, + listedFiles -> + detector + .matches(path, listedFiles) + .thenComposeAsync( + matched -> { + Set
tablePaths = ConcurrentHashMap.newKeySet(); + if (Boolean.TRUE.equals(matched)) { + Table table = + Table.builder() + .absoluteTableUri(path) + .databaseName(databaseName) + .lakeName(lakeName) + .tableFormat(detector.format()) + .build(); + if (!isExcluded(table.getAbsoluteTableUri(), excludedPathPatterns)) { + tablePaths.add(table); + } + return CompletableFuture.completedFuture(tablePaths); + } + + List directories = + listedFiles.stream() + .filter(File::isDirectory) + .collect(Collectors.toList()); + List> recursiveFutures = new ArrayList<>(); + for (File file : directories) { + String filePath = + storageUtils.constructFileUri(path, file.getFilename()); + if (!isExcluded(filePath, excludedPathPatterns)) { + CompletableFuture recursiveFuture = + discoverTablesInPath( + filePath, + lakeName, + databaseName, + excludedPathPatterns, + detector) + .thenAccept(tablePaths::addAll); + recursiveFutures.add(recursiveFuture); + } + } + return CompletableFuture.allOf( + recursiveFutures.toArray(new CompletableFuture[0])) + .thenApplyAsync(ignored -> tablePaths, executorService); + }, + executorService), executorService) .exceptionally( e -> { log.error("Failed to discover tables in path: {}", path, e); lakeviewExtractorMetrics.incrementTableDiscoveryFailureCounter( - getMetadataExtractorFailureReason( - e, - MetricsConstants.MetadataUploadFailureReasons.UNKNOWN) - ); + getMetadataExtractorFailureReason( + e, MetricsConstants.MetadataUploadFailureReasons.UNKNOWN)); return emptySet(); }); } catch (Exception e) { @@ -212,15 +249,6 @@ private CompletableFuture> discoverTablesInPath( } } - private Optional detectTableFormat(List listedFiles) { - for (TableFormatDetector detector : tableFormatDetectors) { - if (detector.matches(listedFiles)) { - return Optional.of(detector.format()); - } - } - return Optional.empty(); - } - private boolean isExcluded(String filePath, List excludedPathPatterns) { return excludedPathPatterns.stream().anyMatch(filePath::matches); } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java index cfd9003e..3bb4e696 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java @@ -3,18 +3,29 @@ import ai.onehouse.api.models.request.TableFormat; import ai.onehouse.storage.models.File; import java.util.List; +import java.util.concurrent.CompletableFuture; /** * Identifies whether a directory listing represents the root of a table of a particular format. * - *

One implementation per supported format (Hudi, Iceberg, ...). {@link TableDiscoveryService} - * iterates registered detectors and stops at the first match, so detectors should be cheap and - * unambiguous. + *

One implementation per supported format (Hudi, Iceberg, ...). Each {@code Database} in the + * parser YAML declares its {@code tableFormat}, and {@link TableDiscoveryService} picks the + * single matching detector for that database — detectors are not raced against each other, so + * there is no ordering trap. + * + *

{@link #matches} returns a {@link CompletableFuture} because some detectors need a follow-up + * directory listing to confirm a match (e.g. Iceberg's "must have a {@code *.metadata.json} + * inside {@code metadata/}" rule). Stateless detectors should return {@link + * CompletableFuture#completedFuture}. */ public interface TableFormatDetector { /** Format this detector identifies. */ TableFormat format(); - /** Returns true if the listed files at a directory indicate a table root of {@link #format()}. */ - boolean matches(List listedFiles); + /** + * Returns true if the directory at {@code path} (with its top-level listing already in {@code + * listedFiles}) is a table root of {@link #format()}. Implementations may issue additional + * storage I/O to validate. + */ + CompletableFuture matches(String path, List listedFiles); } diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java index 8bf976a2..65570dc9 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java @@ -166,7 +166,7 @@ void testDiscoverTablesWithExclusion() throws ExecutionException, InterruptedExc ForkJoinPool.commonPool(), hudiMetadataExtractorMetrics, new HudiTableFormatDetector(), - new IcebergTableFormatDetector()); + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); Set

tableSet = tableDiscoveryService.discoverTables().get(); List
expectedResponseSet = @@ -260,7 +260,7 @@ void testCaseWhereMoreThanOneDiscoveredTablesForTableId() { ForkJoinPool.commonPool(), hudiMetadataExtractorMetrics, new HudiTableFormatDetector(), - new IcebergTableFormatDetector()); + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); Set
discoveredTables = tableDiscoveryService.discoverTables().join(); assertEquals(emptySet(), discoveredTables); @@ -300,7 +300,7 @@ void testWithInvalidBasePath() { ForkJoinPool.commonPool(), hudiMetadataExtractorMetrics, new HudiTableFormatDetector(), - new IcebergTableFormatDetector()); + new IcebergTableFormatDetector(s3AsyncStorageClient, new StorageUtils())); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); } @@ -334,7 +334,7 @@ void testTableDiscoveryWithAsyncExceptions() { ForkJoinPool.commonPool(), hudiMetadataExtractorMetrics, new HudiTableFormatDetector(), - new IcebergTableFormatDetector()); + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); } @@ -367,7 +367,7 @@ void testTableDiscoveryEncountersRateLimitException() { ForkJoinPool.commonPool(), hudiMetadataExtractorMetrics, new HudiTableFormatDetector(), - new IcebergTableFormatDetector()); + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); @@ -377,6 +377,72 @@ void testTableDiscoveryEncountersRateLimitException() { MetricsConstants.MetadataUploadFailureReasons.RATE_LIMITING); } + @Test + void testDiscoverIcebergTableByDeclaredFormat() { + // Database declared as ICEBERG. Two children: `orders/` is a real Iceberg table (metadata/ + // with a *.metadata.json), `docs/` is a false-positive shape (metadata/ but no .metadata.json + // inside) and must be skipped. + String basePath = "s3://bucket/iceberg_warehouse/"; + String ordersPath = basePath + "orders/"; + String ordersMetadata = ordersPath + "metadata"; + String docsPath = basePath + "docs/"; + String docsMetadata = docsPath + "metadata"; + + when(asyncStorageClient.listAllFilesInDir(basePath)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(generateFileObj("orders/", true), generateFileObj("docs/", true)))); + when(asyncStorageClient.listAllFilesInDir(ordersPath)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList( + generateFileObj("metadata", true), generateFileObj("data", true)))); + when(asyncStorageClient.listAllFilesInDir(ordersMetadata)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(generateFileObj("v1.metadata.json", false)))); + when(asyncStorageClient.listAllFilesInDir(docsPath)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(generateFileObj("metadata", true)))); + when(asyncStorageClient.listAllFilesInDir(docsMetadata)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(generateFileObj("README.md", false)))); + + when(config.getMetadataExtractorConfig()).thenReturn(metadataExtractorConfig); + when(metadataExtractorConfig.getPathExclusionPatterns()).thenReturn(Optional.of(emptyList())); + when(metadataExtractorConfig.getParserConfig()) + .thenReturn( + Collections.singletonList( + ParserConfig.builder() + .lake(LAKE) + .databases( + Collections.singletonList( + Database.builder() + .name(DATABASE) + .tableFormat(ai.onehouse.api.models.request.TableFormat.ICEBERG) + .basePaths(Collections.singletonList(basePath)) + .build())) + .build())); + + tableDiscoveryService = + new TableDiscoveryService( + asyncStorageClient, + new StorageUtils(), + new ConfigProvider(config), + ForkJoinPool.commonPool(), + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); + + Set
tables = tableDiscoveryService.discoverTables().join(); + assertEquals(1, tables.size()); + Table only = tables.iterator().next(); + assertEquals(ordersPath, only.getAbsoluteTableUri()); + assertEquals(ai.onehouse.api.models.request.TableFormat.ICEBERG, only.getTableFormat()); + } + private File generateFileObj(String fileName, boolean isDirectory) { return File.builder() .filename(fileName) diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java index d4aa0c77..cc313a45 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java @@ -12,6 +12,7 @@ import org.junit.jupiter.api.Test; class TestHudiTableFormatDetector { + private static final String ANY_PATH = "s3://bucket/db/t/"; private final HudiTableFormatDetector detector = new HudiTableFormatDetector(); @Test @@ -22,22 +23,27 @@ void declaresHudiFormat() { @Test void matchesWhenHoodieFolderPresent() { assertTrue( - detector.matches( - Arrays.asList( - file(".hoodie", true), - file("part-0.parquet", false)))); + detector + .matches( + ANY_PATH, + Arrays.asList(file(".hoodie", true), file("part-0.parquet", false))) + .join()); } @Test void doesNotMatchWhenAbsent() { - assertFalse(detector.matches(Collections.singletonList(file("part-0.parquet", false)))); + assertFalse( + detector + .matches(ANY_PATH, Collections.singletonList(file("part-0.parquet", false))) + .join()); } @Test void doesNotMatchOnIcebergLayout() { assertFalse( - detector.matches( - Arrays.asList(file("metadata", true), file("data", true)))); + detector + .matches(ANY_PATH, Arrays.asList(file("metadata", true), file("data", true))) + .join()); } private static File file(String name, boolean dir) { diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java index 3ed24bde..fbe12346 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import ai.onehouse.storage.models.File; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -14,7 +15,7 @@ class TestIcebergMetadataUploaderService { @Test - void picksLexicographicallyLatestHiveStyleMetadataJson() { + void picksLatestHiveGlueStyleMetadataJsonByNumericPrefix() { Optional latest = IcebergMetadataUploaderService.pickLatestMetadataJson( Arrays.asList( @@ -26,17 +27,19 @@ void picksLexicographicallyLatestHiveStyleMetadataJson() { } @Test - void picksLatestHadoopCatalogMetadataJson() { + void picksLatestHadoopCatalogMetadataJsonByVersionNumber() { + // v10 is numerically newer than v2 — lex-sort would pick v2, which would be wrong. Numeric + // comparison on the leading integer correctly returns v10. Optional latest = IcebergMetadataUploaderService.pickLatestMetadataJson( Arrays.asList( jsonFile("v1.metadata.json"), jsonFile("v2.metadata.json"), - jsonFile("v10.metadata.json"))); + jsonFile("v9.metadata.json"), + jsonFile("v10.metadata.json"), + jsonFile("v11.metadata.json"))); assertTrue(latest.isPresent()); - // Hadoop catalog convention is zero-padded in practice; bare integers sort poorly but we still - // pick *a* result deterministically. Document the limitation rather than hide it. - assertEquals("v2.metadata.json", latest.get().getFilename()); + assertEquals("v11.metadata.json", latest.get().getFilename()); } @Test @@ -60,6 +63,48 @@ void returnsEmptyWhenNoMetadataJson() { .isPresent()); } + @Test + void extractVersionNumberHandlesAllNamingShapes() { + assertEquals(10L, IcebergMetadataUploaderService.extractVersionNumber("v10.metadata.json")); + assertEquals(20L, IcebergMetadataUploaderService.extractVersionNumber("00020-uuid.metadata.json")); + assertEquals(0L, IcebergMetadataUploaderService.extractVersionNumber("00000-uuid.metadata.json")); + // Non-numeric filenames still parse to -1 so they don't dominate the sort. + assertEquals(-1L, IcebergMetadataUploaderService.extractVersionNumber("metadata.json")); + assertEquals(-1L, IcebergMetadataUploaderService.extractVersionNumber("v.metadata.json")); + } + + @Test + void resolveFromVersionHintReturnsTargetWhenPresent() { + Optional resolved = + IcebergMetadataUploaderService.resolveFromVersionHint( + "7\n".getBytes(StandardCharsets.UTF_8), + Arrays.asList( + jsonFile("v6.metadata.json"), + jsonFile("v7.metadata.json"), + jsonFile("v8.metadata.json"))); + assertTrue(resolved.isPresent()); + assertEquals("v7.metadata.json", resolved.get().getFilename()); + } + + @Test + void resolveFromVersionHintReturnsEmptyWhenTargetMissing() { + // Hint points at v42 but only v6 / v7 exist — caller will fall back to numeric sort. + assertFalse( + IcebergMetadataUploaderService.resolveFromVersionHint( + "42".getBytes(StandardCharsets.UTF_8), + Arrays.asList(jsonFile("v6.metadata.json"), jsonFile("v7.metadata.json"))) + .isPresent()); + } + + @Test + void resolveFromVersionHintReturnsEmptyOnNonIntegerContent() { + assertFalse( + IcebergMetadataUploaderService.resolveFromVersionHint( + "not-an-int".getBytes(StandardCharsets.UTF_8), + Collections.singletonList(jsonFile("v1.metadata.json"))) + .isPresent()); + } + @Test void lastPathSegmentExtractsFilenameFromUri() { assertEquals( diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java index c16cd11e..99025df9 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java @@ -3,16 +3,37 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.StorageUtils; import ai.onehouse.storage.models.File; import java.time.Instant; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +@ExtendWith(MockitoExtension.class) class TestIcebergTableFormatDetector { - private final IcebergTableFormatDetector detector = new IcebergTableFormatDetector(); + private static final String TABLE_PATH = "s3://bucket/db/t"; + private static final String METADATA_PATH = "s3://bucket/db/t/metadata"; + + @Mock private AsyncStorageClient asyncStorageClient; + private IcebergTableFormatDetector detector; + + @BeforeEach + void setUp() { + detector = new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils()); + } @Test void declaresIcebergFormat() { @@ -20,23 +41,48 @@ void declaresIcebergFormat() { } @Test - void matchesWhenMetadataDirectoryPresent() { + void matchesWhenMetadataDirHasMetadataJson() { + when(asyncStorageClient.listAllFilesInDir(eq(METADATA_PATH))) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList( + file("v3.metadata.json", false), + file("snap-1.avro", false)))); assertTrue( - detector.matches( - Arrays.asList(file("metadata", true), file("data", true)))); + detector + .matches(TABLE_PATH, Arrays.asList(file("metadata", true), file("data", true))) + .join()); + } + + @Test + void doesNotMatchWhenMetadataDirEmptyOfMetadataJson() { + // Real false-positive: a folder happens to have a `metadata/` subdir but no Iceberg pointer. + when(asyncStorageClient.listAllFilesInDir(eq(METADATA_PATH))) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(file("schema.csv", false), file("README.md", false)))); + assertFalse( + detector + .matches(TABLE_PATH, Arrays.asList(file("metadata", true), file("data", true))) + .join()); } @Test void doesNotMatchOnFileNamedMetadata() { - // A non-directory entry named "metadata" must not be confused with the metadata/ folder. - assertFalse(detector.matches(Collections.singletonList(file("metadata", false)))); + // A non-directory entry named "metadata" must not be confused with the metadata/ folder, and + // we must not issue the validating LIST when the marker isn't present. + assertFalse( + detector.matches(TABLE_PATH, Collections.singletonList(file("metadata", false))).join()); + verify(asyncStorageClient, never()).listAllFilesInDir(eq(METADATA_PATH)); } @Test void doesNotMatchOnHudiLayout() { assertFalse( - detector.matches( - Arrays.asList(file(".hoodie", true), file("part-0.parquet", false)))); + detector + .matches(TABLE_PATH, Arrays.asList(file(".hoodie", true), file("part-0.parquet", false))) + .join()); + verify(asyncStorageClient, never()).listAllFilesInDir(eq(METADATA_PATH)); } private static File file(String name, boolean dir) { From eb3b21dd11cb973bdfb536213e42de7dc8a333eb Mon Sep 17 00:00:00 2001 From: tiennguyen-onehouse Date: Wed, 13 May 2026 16:36:47 -0700 Subject: [PATCH 5/6] Tolerate trailing slash on metadata/ CommonPrefix in Iceberg detector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit S3 ListObjectsV2 surfaces "subdirectories" as CommonPrefixes whose Prefix string carries a trailing slash (e.g. "metadata/"). The storage client preserves that in File.filename, so the exact-string check "metadata".equals(filename) fails and the detector returns false even though the table layout is a valid Iceberg root. Concrete repro in staging (Testing-Acme org, table s3://acme-data-2/iceberg_tables/iceberg_table_test_may13/): - S3 list of the table base returns a single CommonPrefix "metadata/" - The agent emits tableFormat=ICEBERG + metadataLocationHint in the extractor YAML, the v2 lake-view image parses it fine, the discovery walks the base path - Detector returns false at the base, discoverTablesInPath recurses into metadata/, no Iceberg Table is ever emitted, dispatchUpload is called with an empty Iceberg set, IcebergMetadataUploaderService early-returns, nothing reaches the temp bucket Strip a single trailing slash before comparing, so both "metadata" and "metadata/" are accepted. Hudi's detector dodges this naturally via startsWith(".hoodie"); the Iceberg one switched to equals() and lost the same tolerance. Add a unit test that hands the detector a File with filename="metadata/" — the shape S3 actually produces — which fails on main and passes here. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../IcebergTableFormatDetector.java | 20 ++++++++++++++++--- .../TestIcebergTableFormatDetector.java | 15 ++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java index 8e6e2ee2..9c843a56 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java @@ -44,12 +44,26 @@ public TableFormat format() { @Override public CompletableFuture matches(String path, List listedFiles) { + // S3 ListObjectsV2 surfaces "directories" as CommonPrefixes, which the storage client + // maps to File objects whose filename retains the trailing slash (e.g. "metadata/"), + // since that is exactly the Prefix string S3 returns. Strip a trailing slash before + // comparing so the check works regardless of whether the client preserves it. boolean hasMetadataDir = listedFiles.stream() .anyMatch( - file -> - file.isDirectory() - && ICEBERG_METADATA_FOLDER_NAME.equals(file.getFilename())); + file -> { + if (!file.isDirectory()) { + return false; + } + String name = file.getFilename(); + if (name == null) { + return false; + } + if (name.endsWith("/")) { + name = name.substring(0, name.length() - 1); + } + return ICEBERG_METADATA_FOLDER_NAME.equals(name); + }); if (!hasMetadataDir) { return CompletableFuture.completedFuture(false); } diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java index 99025df9..cd5665bc 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java @@ -85,6 +85,21 @@ void doesNotMatchOnHudiLayout() { verify(asyncStorageClient, never()).listAllFilesInDir(eq(METADATA_PATH)); } + @Test + void matchesWhenMetadataDirEntryHasTrailingSlash() { + // S3 ListObjectsV2 returns CommonPrefixes as e.g. "metadata/" (with the trailing slash that + // S3 itself uses). The storage client may surface that filename verbatim, so the detector + // must accept both "metadata" and "metadata/" as the folder marker. + when(asyncStorageClient.listAllFilesInDir(eq(METADATA_PATH))) + .thenReturn( + CompletableFuture.completedFuture( + Collections.singletonList(file("00000-uuid.metadata.json", false)))); + assertTrue( + detector + .matches(TABLE_PATH, Collections.singletonList(file("metadata/", true))) + .join()); + } + private static File file(String name, boolean dir) { return File.builder().filename(name).isDirectory(dir).lastModifiedAt(Instant.EPOCH).build(); } From db3b76422cf7cbced0e0a0f43770151ed2243360 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Thu, 14 May 2026 14:23:39 +0530 Subject: [PATCH 6/6] Align TableFormat JSON wire shape with control-plane proto enum LakeView's InitializeTableMetricsCheckpoint request sends tableFormat as a Jackson-serialized enum value. Without an explicit @JsonProperty the wire string was the short Java identifier ("HUDI" / "ICEBERG"), which protobuf-java-util's JSON parser on external-api cannot map onto lake.TableFormat (the proto enum uses the canonical names "TABLE_FORMAT_HUDI" / "TABLE_FORMAT_ICEBERG"). The mismatch is silent: the parser falls back to enum-zero (TABLE_FORMAT_INVALID), the checkpoint is persisted without the field, and GenerateCommitMetadata UploadUrlHandler routes the table through the Hudi back-compat path. For Hudi this happens to be correct so nothing surfaces; for Iceberg the filename regex rejects the metadata.json with 400. Annotate the enum values so the JSON wire string matches the proto enum name on both sides of the contract. Java callsites remain unchanged. --- .../onehouse/api/models/request/TableFormat.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java b/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java index 8616370c..9d682bff 100644 --- a/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java +++ b/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java @@ -1,6 +1,20 @@ package ai.onehouse.api.models.request; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Physical observed-table format. Wire-serialized to the protobuf enum names defined in + * lake/table.proto's {@code TableFormat} so the control plane's protobuf-java-util JSON parser + * accepts the value on RPCs like {@code InitializeTableMetricsCheckpoint}. Without these + * {@code @JsonProperty} aliases, Jackson would emit the short Java identifier ({@code "ICEBERG"}) + * and the server-side parser — which requires the canonical proto enum name + * ({@code "TABLE_FORMAT_ICEBERG"}) — would silently drop the value and persist the checkpoint + * with the proto enum-zero default ({@code TABLE_FORMAT_INVALID}), routing iceberg uploads + * through the Hudi back-compat fallback. + */ public enum TableFormat { + @JsonProperty("TABLE_FORMAT_HUDI") HUDI, + @JsonProperty("TABLE_FORMAT_ICEBERG") ICEBERG }