diff --git a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java index 146285e3..912f56ad 100644 --- a/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java +++ b/lakeview-sync-tool/src/main/java/ai/onehouse/lakeview/sync/LakeviewSyncTool.java @@ -14,6 +14,9 @@ import ai.onehouse.config.models.configv1.ParserConfig; import ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher; import ai.onehouse.metadata_extractor.HoodiePropertiesReader; +import ai.onehouse.metadata_extractor.HudiTableFormatDetector; +import ai.onehouse.metadata_extractor.IcebergMetadataUploaderService; +import ai.onehouse.metadata_extractor.IcebergTableFormatDetector; import ai.onehouse.metadata_extractor.LSMTimelineManifestReader; import ai.onehouse.metadata_extractor.TableDiscoveryAndUploadJob; import ai.onehouse.metadata_extractor.TableDiscoveryService; @@ -253,7 +256,9 @@ private TableDiscoveryAndUploadJob getTableDiscoveryAndUploadJob(@Nonnull Config configProvider); TableDiscoveryService tableDiscoveryService = new TableDiscoveryService(asyncStorageClient, storageUtils, - configProvider, executorService, lakeViewExtractorMetrics); + configProvider, executorService, lakeViewExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(asyncStorageClient, storageUtils)); HoodiePropertiesReader hoodiePropertiesReader = new HoodiePropertiesReader(asyncStorageClient, lakeViewExtractorMetrics); OnehouseApiClient onehouseApiClient = new OnehouseApiClient(asyncHttpClientWithRetry, config, @@ -267,8 +272,11 @@ presignedUrlFileUploader, onehouseApiClient, storageUtils, executorService, new lakeViewExtractorMetrics, config, lsmTimelineManifestReader); TableMetadataUploaderService tableMetadataUploaderService = new TableMetadataUploaderService(hoodiePropertiesReader, onehouseApiClient, timelineCommitInstantsUploader, lakeViewExtractorMetrics, executorService); + IcebergMetadataUploaderService icebergMetadataUploaderService = new IcebergMetadataUploaderService( + asyncStorageClient, onehouseApiClient, presignedUrlFileUploader, storageUtils, lakeViewExtractorMetrics); - return new TableDiscoveryAndUploadJob(tableDiscoveryService, tableMetadataUploaderService, lakeViewExtractorMetrics, asyncStorageClient); + return new TableDiscoveryAndUploadJob(tableDiscoveryService, tableMetadataUploaderService, + icebergMetadataUploaderService, lakeViewExtractorMetrics, asyncStorageClient); } private AsyncStorageClient getAsyncStorageClient(@Nonnull Config config, @Nonnull ExecutorService executorService, diff --git a/lakeview/src/main/java/ai/onehouse/api/models/request/InitializeTableMetricsCheckpointRequest.java b/lakeview/src/main/java/ai/onehouse/api/models/request/InitializeTableMetricsCheckpointRequest.java index 70d3debc..faa85477 100644 --- a/lakeview/src/main/java/ai/onehouse/api/models/request/InitializeTableMetricsCheckpointRequest.java +++ b/lakeview/src/main/java/ai/onehouse/api/models/request/InitializeTableMetricsCheckpointRequest.java @@ -21,7 +21,13 @@ public class InitializeTableMetricsCheckpointRequest { public static class InitializeSingleTableMetricsCheckpointRequest { @NonNull String tableId; @NonNull String tableName; + // For Hudi: source of truth for COW vs MOR. For non-Hudi formats it is a meaningless + // placeholder; the server discriminates on tableFormat first. Kept non-null to preserve + // the existing wire contract; defaults to COPY_ON_WRITE for Iceberg tables. @NonNull TableType tableType; + // Physical table format. Absent is interpreted by the server as HUDI for backward compat + // with extractors that haven't been updated. + @Nullable TableFormat tableFormat; String lakeName; String databaseName; String tableBasePath; diff --git a/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java b/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java new file mode 100644 index 00000000..9d682bff --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/api/models/request/TableFormat.java @@ -0,0 +1,20 @@ +package ai.onehouse.api.models.request; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Physical observed-table format. Wire-serialized to the protobuf enum names defined in + * lake/table.proto's {@code TableFormat} so the control plane's protobuf-java-util JSON parser + * accepts the value on RPCs like {@code InitializeTableMetricsCheckpoint}. Without these + * {@code @JsonProperty} aliases, Jackson would emit the short Java identifier ({@code "ICEBERG"}) + * and the server-side parser — which requires the canonical proto enum name + * ({@code "TABLE_FORMAT_ICEBERG"}) — would silently drop the value and persist the checkpoint + * with the proto enum-zero default ({@code TABLE_FORMAT_INVALID}), routing iceberg uploads + * through the Hudi back-compat fallback. + */ +public enum TableFormat { + @JsonProperty("TABLE_FORMAT_HUDI") + HUDI, + @JsonProperty("TABLE_FORMAT_ICEBERG") + ICEBERG +} diff --git a/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java b/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java index bd119d68..14fac7fd 100644 --- a/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java +++ b/lakeview/src/main/java/ai/onehouse/config/models/configv1/Database.java @@ -1,6 +1,9 @@ package ai.onehouse.config.models.configv1; +import ai.onehouse.api.models.request.TableFormat; import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; import lombok.Builder; import lombok.NonNull; import lombok.Value; @@ -12,4 +15,19 @@ public class Database { String name; @NonNull List basePaths; + /** + * Physical table format of the tables under {@link #basePaths}. Null is interpreted as {@link + * TableFormat#HUDI} for backward compatibility with YAMLs written before Iceberg support + * existed. Customers with a mixed-format warehouse should split into separate {@link Database} + * entries, one per format. + */ + @Nullable TableFormat tableFormat; + /** + * Optional per-table hints keyed on the tableId encoded in {@link #basePaths} (the segment after + * {@code #}). The control plane populates this for tables it has richer information about + * (e.g. Iceberg tables discovered via AWS Glue, where the catalog already knows the current + * {@code metadata.json} URI). Missing entries / missing map are normal — older YAML versions + * predate this field, and tables without hints fall back to plain discovery. + */ + @Nullable Map tableHints; } diff --git a/lakeview/src/main/java/ai/onehouse/config/models/configv1/TableHint.java b/lakeview/src/main/java/ai/onehouse/config/models/configv1/TableHint.java new file mode 100644 index 00000000..b47bcb63 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/config/models/configv1/TableHint.java @@ -0,0 +1,24 @@ +package ai.onehouse.config.models.configv1; + +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +/** + * Optional per-table hints supplied by the control plane in the parser YAML, keyed on tableId in + * {@link Database#getTableHints()}. Hints are forward-compatible: older LakeView versions that + * don't know about a field deserialize the rest fine; new fields default to null. + */ +@Builder +@Value +@Jacksonized +public class TableHint { + /** + * For Iceberg tables registered in an external catalog (e.g. AWS Glue), the URI of the current + * {@code metadata.json} that the catalog points at. When present, the extractor can PUT this + * file directly instead of listing the {@code metadata/} folder to find the latest. Empty / + * null means "no hint — fall back to listing". + */ + @Nullable String metadataLocationHint; +} diff --git a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java index 4877f87e..f4e06b87 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/MetadataExtractorConstants.java @@ -14,6 +14,10 @@ private MetadataExtractorConstants() {} public static final String HOODIE_FOLDER_NAME = ".hoodie"; public static final String ARCHIVED_FOLDER_NAME = "archived"; + public static final String ICEBERG_METADATA_FOLDER_NAME = "metadata"; + public static final String ICEBERG_METADATA_FILE_SUFFIX = ".metadata.json"; + public static final String ICEBERG_VERSION_HINT_FILENAME = "version-hint.text"; + public static final String ICEBERG_HADOOP_METADATA_FILE_PREFIX = "v"; public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name"; public static final String HOODIE_TABLE_TYPE_KEY = "hoodie.table.type"; diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java new file mode 100644 index 00000000..9b316516 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/HudiTableFormatDetector.java @@ -0,0 +1,22 @@ +package ai.onehouse.metadata_extractor; + +import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.models.File; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** A directory is a Hudi table root if it contains a {@code .hoodie/} folder. */ +public class HudiTableFormatDetector implements TableFormatDetector { + @Override + public TableFormat format() { + return TableFormat.HUDI; + } + + @Override + public CompletableFuture matches(String path, List listedFiles) { + return CompletableFuture.completedFuture( + listedFiles.stream().anyMatch(file -> file.getFilename().startsWith(HOODIE_FOLDER_NAME))); + } +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java new file mode 100644 index 00000000..605fb19d --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergMetadataUploaderService.java @@ -0,0 +1,454 @@ +package ai.onehouse.metadata_extractor; + +import static ai.onehouse.constants.MetadataExtractorConstants.DEFAULT_FILE_UPLOAD_STREAM_BATCH_SIZE; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_HADOOP_METADATA_FILE_PREFIX; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FILE_SUFFIX; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FOLDER_NAME; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_VERSION_HINT_FILENAME; +import static ai.onehouse.constants.MetadataExtractorConstants.INITIAL_CHECKPOINT; +import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; + +import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; +import ai.onehouse.api.OnehouseApiClient; +import ai.onehouse.api.models.request.CommitTimelineType; +import ai.onehouse.api.models.request.GenerateCommitMetadataUploadUrlRequest; +import ai.onehouse.api.models.request.InitializeTableMetricsCheckpointRequest; +import ai.onehouse.api.models.request.InitializeTableMetricsCheckpointRequest.InitializeSingleTableMetricsCheckpointRequest; +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.api.models.request.TableType; +import ai.onehouse.api.models.request.UploadedFile; +import ai.onehouse.api.models.request.UpsertTableMetricsCheckpointRequest; +import ai.onehouse.api.models.response.GetTableMetricsCheckpointResponse; +import ai.onehouse.constants.MetricsConstants; +import ai.onehouse.metadata_extractor.models.Checkpoint; +import ai.onehouse.metadata_extractor.models.Table; +import ai.onehouse.metrics.LakeViewExtractorMetrics; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.PresignedUrlFileUploader; +import ai.onehouse.storage.StorageUtils; +import ai.onehouse.storage.models.File; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.inject.Inject; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +/** + * Discovers and uploads Iceberg table metadata pointer files ({@code metadata/*.metadata.json}). + * + *

Iceberg's snapshot-summary contract means a single {@code metadata.json} contains all of: + * cumulative {@code total-records}, {@code total-files-size}, {@code total-data-files}, + * per-snapshot deltas, and {@code snapshot-log[]} history. We therefore upload exactly one file + * per iteration — the latest {@code metadata.json}. The control plane parses snapshot summaries + * from it and writes metrics to OpenSearch. + * + *

Parallel to {@link TableMetadataUploaderService} (the Hudi orchestrator) rather than a + * subclass: Hudi's active/archived timeline distinction, hoodie.properties bootstrap, and LSM + * manifest plumbing don't apply, so a separate service is clearer than a branching megaclass. + * Shared primitives ({@link OnehouseApiClient}, {@link PresignedUrlFileUploader}, {@link + * AsyncStorageClient}) are reused. + * + *

Selecting the current pointer. Three paths, in order of preference: + *

    + *
  1. {@link Table#getMetadataLocationHint()} from the parser YAML (catalog-driven).
  2. + *
  3. {@code metadata/version-hint.text} for Hadoop-catalog tables — its integer content names + * the current {@code v{N}.metadata.json} unambiguously.
  4. + *
  5. Numeric-aware filename comparison as a last resort. The leading integer in the filename + * (after an optional {@code v} prefix) is treated as the version number. Works for both + * {@code v{N}.metadata.json} (Hadoop) and {@code 00000-.metadata.json} + * (Hive/Glue/Spark) without depending on zero-padding.
  6. + *
+ */ +@Slf4j +public class IcebergMetadataUploaderService { + private final AsyncStorageClient asyncStorageClient; + private final OnehouseApiClient onehouseApiClient; + private final PresignedUrlFileUploader presignedUrlFileUploader; + private final StorageUtils storageUtils; + private final LakeViewExtractorMetrics metrics; + private final ObjectMapper mapper; + + @Inject + public IcebergMetadataUploaderService( + @Nonnull @TableDiscoveryObjectStorageAsyncClient AsyncStorageClient asyncStorageClient, + @Nonnull OnehouseApiClient onehouseApiClient, + @Nonnull PresignedUrlFileUploader presignedUrlFileUploader, + @Nonnull StorageUtils storageUtils, + @Nonnull LakeViewExtractorMetrics metrics) { + this.asyncStorageClient = asyncStorageClient; + this.onehouseApiClient = onehouseApiClient; + this.presignedUrlFileUploader = presignedUrlFileUploader; + this.storageUtils = storageUtils; + this.metrics = metrics; + this.mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + } + + /** + * Uploads the latest metadata.json for each Iceberg table whose pointer file has advanced since + * the last checkpoint. Returns true only if every table either succeeded or was already + * up-to-date. + */ + public CompletableFuture uploadInstantsInTables(Set tables) { + if (tables.isEmpty()) { + return CompletableFuture.completedFuture(true); + } + log.info("Uploading Iceberg metadata for {} table(s)", tables.size()); + List> perTable = + tables.stream().map(this::processTable).collect(Collectors.toList()); + return CompletableFuture.allOf(perTable.toArray(new CompletableFuture[0])) + .thenApply( + ignored -> perTable.stream().map(CompletableFuture::join).allMatch(Boolean.TRUE::equals)); + } + + private CompletableFuture processTable(Table table) { + return onehouseApiClient + .getTableMetricsCheckpoints(Collections.singletonList(table.getTableId())) + .thenComposeAsync( + response -> { + if (response.isFailure()) { + log.error( + "Failed to fetch checkpoint for Iceberg table {} (status {}): {}", + table.getTableId(), + response.getStatusCode(), + response.getCause()); + return CompletableFuture.completedFuture(false); + } + Optional existing = parseCheckpoint(response, table.getTableId()); + if (existing.isPresent()) { + return uploadIfNewMetadataJson(table, existing.get()); + } + return initialiseTable(table) + .thenComposeAsync( + initOk -> { + if (!initOk) { + return CompletableFuture.completedFuture(false); + } + return uploadIfNewMetadataJson(table, INITIAL_CHECKPOINT); + }); + }) + .exceptionally( + e -> { + log.error("Exception processing Iceberg table {}", table.getTableId(), e); + metrics.incrementTableMetadataProcessingFailureCounter( + getMetadataExtractorFailureReason( + e, MetricsConstants.MetadataUploadFailureReasons.UNKNOWN), + String.format("Iceberg upload exception: %s", e.getMessage())); + return false; + }); + } + + private Optional parseCheckpoint( + GetTableMetricsCheckpointResponse response, String tableId) { + return response.getCheckpoints().stream() + .filter(c -> tableId.equals(c.getTableId())) + .findFirst() + .map(GetTableMetricsCheckpointResponse.TableMetadataCheckpoint::getCheckpoint) + .filter(StringUtils::isNotBlank) + .map( + json -> { + try { + return mapper.readValue(json, Checkpoint.class); + } catch (JsonProcessingException e) { + log.error("Malformed checkpoint JSON for Iceberg table {}; treating as missing", tableId, e); + return null; + } + }); + } + + private CompletableFuture initialiseTable(Table table) { + String tableName = deriveTableName(table.getAbsoluteTableUri()); + InitializeSingleTableMetricsCheckpointRequest single = + InitializeSingleTableMetricsCheckpointRequest.builder() + .tableId(table.getTableId()) + .tableName(tableName) + // COW is a meaningless placeholder for Iceberg; server discriminates on tableFormat. + .tableType(TableType.COPY_ON_WRITE) + .tableFormat(TableFormat.ICEBERG) + .lakeName(table.getLakeName()) + .databaseName(table.getDatabaseName()) + .tableBasePath(table.getAbsoluteTableUri()) + .build(); + return onehouseApiClient + .initializeTableMetricsCheckpoint( + InitializeTableMetricsCheckpointRequest.builder() + .tables(Collections.singletonList(single)) + .build()) + .thenApply( + initResponse -> { + if (initResponse.isFailure()) { + log.error( + "Failed to initialise Iceberg table {} (status {}): {}", + table.getTableId(), + initResponse.getStatusCode(), + initResponse.getCause()); + return false; + } + return true; + }); + } + + private CompletableFuture uploadIfNewMetadataJson(Table table, Checkpoint checkpoint) { + // Fast path: control plane provided the current metadata.json URI (e.g. from AWS Glue's + // metadata_location parameter). Skip the per-cycle LIST and PUT the file directly. + if (StringUtils.isNotBlank(table.getMetadataLocationHint())) { + String hint = table.getMetadataLocationHint(); + String filename = lastPathSegment(hint); + if (filename.equals(checkpoint.getLastUploadedFile())) { + log.debug( + "Iceberg table {} already up-to-date at {} (from hint)", + table.getTableId(), + filename); + return CompletableFuture.completedFuture(true); + } + File synthetic = + File.builder() + .filename(filename) + .isDirectory(false) + // Hint doesn't carry lastModifiedAt; use now() since the consumer treats this as + // advisory and discriminates by checkpoint batchId for ordering. + .lastModifiedAt(Instant.now()) + .build(); + return uploadAndAdvanceCheckpoint(table, hint, synthetic, checkpoint); + } + + // Fallback path: list metadata/, prefer version-hint.text, else numeric-aware sort. + String metadataDirUri = + storageUtils.constructFileUri(table.getAbsoluteTableUri(), ICEBERG_METADATA_FOLDER_NAME); + return asyncStorageClient + .listAllFilesInDir(metadataDirUri) + .thenComposeAsync( + files -> + resolveLatestMetadataJson(metadataDirUri, files) + .thenComposeAsync( + latestOpt -> { + if (!latestOpt.isPresent()) { + log.error( + "Iceberg table {} has no *.metadata.json under {}", + table.getTableId(), + metadataDirUri); + metrics.incrementTableMetadataProcessingFailureCounter( + MetricsConstants.MetadataUploadFailureReasons.NO_SUCH_KEY, + "Iceberg table missing metadata.json files"); + return CompletableFuture.completedFuture(false); + } + File latest = latestOpt.get(); + if (latest.getFilename().equals(checkpoint.getLastUploadedFile())) { + log.debug( + "Iceberg table {} already up-to-date at {}", + table.getTableId(), + latest.getFilename()); + return CompletableFuture.completedFuture(true); + } + String fileUri = + storageUtils.constructFileUri(metadataDirUri, latest.getFilename()); + return uploadAndAdvanceCheckpoint(table, fileUri, latest, checkpoint); + })); + } + + /** + * Resolves the current {@code metadata.json} given a listing of {@code metadata/}. Reads + * {@code version-hint.text} when present and falls back to numeric-aware filename comparison + * otherwise. + */ + private CompletableFuture> resolveLatestMetadataJson( + String metadataDirUri, List files) { + Optional versionHintFile = + files.stream() + .filter(f -> !f.isDirectory() && ICEBERG_VERSION_HINT_FILENAME.equals(f.getFilename())) + .findFirst(); + if (!versionHintFile.isPresent()) { + return CompletableFuture.completedFuture(pickLatestMetadataJson(files)); + } + String hintUri = storageUtils.constructFileUri(metadataDirUri, ICEBERG_VERSION_HINT_FILENAME); + return asyncStorageClient + .readFileAsBytes(hintUri) + .thenApply( + bytes -> { + Optional fromHint = resolveFromVersionHint(bytes, files); + if (fromHint.isPresent()) { + return fromHint; + } + log.warn( + "version-hint.text at {} did not point at an existing metadata.json; falling back to numeric sort", + hintUri); + return pickLatestMetadataJson(files); + }) + .exceptionally( + ex -> { + log.warn( + "Failed to read version-hint.text at {}; falling back to numeric sort", + hintUri, + ex); + return pickLatestMetadataJson(files); + }); + } + + static Optional resolveFromVersionHint(byte[] versionHintBytes, List files) { + String content = new String(versionHintBytes, StandardCharsets.UTF_8).trim(); + long version; + try { + version = Long.parseLong(content); + } catch (NumberFormatException e) { + log.warn("version-hint.text contained non-integer content: '{}'", content); + return Optional.empty(); + } + String target = ICEBERG_HADOOP_METADATA_FILE_PREFIX + version + ICEBERG_METADATA_FILE_SUFFIX; + return files.stream() + .filter(f -> !f.isDirectory() && target.equals(f.getFilename())) + .findFirst(); + } + + private CompletableFuture uploadAndAdvanceCheckpoint( + Table table, String fileUri, File metadataJson, Checkpoint priorCheckpoint) { + return onehouseApiClient + .generateCommitMetadataUploadUrl( + GenerateCommitMetadataUploadUrlRequest.builder() + .tableId(table.getTableId()) + .commitInstants(Collections.singletonList(metadataJson.getFilename())) + // ACTIVE is a placeholder; Iceberg has no archived/active distinction. + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE) + .build()) + .thenComposeAsync( + urlResponse -> { + if (urlResponse.isFailure() || urlResponse.getUploadUrls().isEmpty()) { + log.error( + "Failed to obtain presigned URL for {} (status {}): {}", + metadataJson.getFilename(), + urlResponse.getStatusCode(), + urlResponse.getCause()); + return CompletableFuture.completedFuture(false); + } + String presigned = urlResponse.getUploadUrls().get(0); + return presignedUrlFileUploader + .uploadFileToPresignedUrl(presigned, fileUri, DEFAULT_FILE_UPLOAD_STREAM_BATCH_SIZE) + .thenComposeAsync( + ignored -> { + metrics.incrementMetadataUploadSuccessCounter(); + return advanceCheckpoint(table, metadataJson, priorCheckpoint); + }); + }); + } + + private CompletableFuture advanceCheckpoint( + Table table, File metadataJson, Checkpoint priorCheckpoint) { + Checkpoint next = + priorCheckpoint + .toBuilder() + .batchId(priorCheckpoint.getBatchId() + 1) + .checkpointTimestamp(Instant.now()) + .lastUploadedFile(metadataJson.getFilename()) + // Iceberg has no archived/active distinction; mark archived processed so consumers + // that interpret the legacy field for ordering don't loop. + .archivedCommitsProcessed(true) + .build(); + String checkpointJson; + try { + checkpointJson = mapper.writeValueAsString(next); + } catch (JsonProcessingException e) { + log.error("Failed to serialise Iceberg checkpoint for table {}", table.getTableId(), e); + return CompletableFuture.completedFuture(false); + } + UpsertTableMetricsCheckpointRequest request = + UpsertTableMetricsCheckpointRequest.builder() + .tableId(table.getTableId()) + .checkpoint(checkpointJson) + .filesUploaded(Collections.singletonList(metadataJson.getFilename())) + .uploadedFiles( + Collections.singletonList( + UploadedFile.builder() + .name(metadataJson.getFilename()) + .lastModifiedAt(metadataJson.getLastModifiedAt().toEpochMilli()) + .build())) + .commitTimelineType(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE) + .build(); + return onehouseApiClient + .upsertTableMetricsCheckpoint(request) + .thenApply( + upsert -> { + if (upsert.isFailure()) { + log.error( + "Failed to upsert checkpoint for Iceberg table {} (status {}): {}", + table.getTableId(), + upsert.getStatusCode(), + upsert.getCause()); + return false; + } + return true; + }); + } + + /** + * Picks the latest {@code *.metadata.json} from a {@code metadata/} listing using numeric-aware + * comparison. The leading integer in the filename (after an optional {@code v} prefix) is the + * version number; ties fall back to lexicographic order on the full filename so the result is + * deterministic. + * + *

Examples: + *

    + *
  • {@code v1.metadata.json}, {@code v10.metadata.json}, {@code v2.metadata.json} → + * {@code v10.metadata.json}
  • + *
  • {@code 00001-a.metadata.json}, {@code 00010-b.metadata.json} → + * {@code 00010-b.metadata.json}
  • + *
+ */ + static Optional pickLatestMetadataJson(List files) { + Comparator byVersionThenName = + Comparator.comparingLong(f -> extractVersionNumber(f.getFilename())) + .thenComparing(File::getFilename); + return files.stream() + .filter(f -> !f.isDirectory()) + .filter(f -> f.getFilename().endsWith(ICEBERG_METADATA_FILE_SUFFIX)) + .max(byVersionThenName); + } + + /** + * Extracts the leading integer "version number" from an Iceberg metadata-json filename. Returns + * {@code -1} when no integer prefix is present (the file still participates in ordering — it + * just loses any tiebreak with numerically-named siblings). + */ + static long extractVersionNumber(String filename) { + String base = + filename.endsWith(ICEBERG_METADATA_FILE_SUFFIX) + ? filename.substring(0, filename.length() - ICEBERG_METADATA_FILE_SUFFIX.length()) + : filename; + int i = 0; + if (i < base.length() && base.charAt(i) == 'v') { + i++; + } + int start = i; + while (i < base.length() && Character.isDigit(base.charAt(i))) { + i++; + } + if (i == start) { + return -1L; + } + try { + return Long.parseLong(base.substring(start, i)); + } catch (NumberFormatException e) { + return -1L; + } + } + + static String lastPathSegment(String uri) { + int idx = uri.lastIndexOf('/'); + return idx < 0 ? uri : uri.substring(idx + 1); + } + + private static String deriveTableName(String basePathUri) { + String trimmed = basePathUri.endsWith("/") ? basePathUri.substring(0, basePathUri.length() - 1) : basePathUri; + int idx = trimmed.lastIndexOf('/'); + return idx < 0 ? trimmed : trimmed.substring(idx + 1); + } +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java new file mode 100644 index 00000000..9c843a56 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/IcebergTableFormatDetector.java @@ -0,0 +1,81 @@ +package ai.onehouse.metadata_extractor; + +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FILE_SUFFIX; +import static ai.onehouse.constants.MetadataExtractorConstants.ICEBERG_METADATA_FOLDER_NAME; + +import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.StorageUtils; +import ai.onehouse.storage.models.File; +import com.google.inject.Inject; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; + +/** + * A directory is an Iceberg table root if it contains a {@code metadata/} sub-directory and + * that sub-directory contains at least one {@code *.metadata.json} pointer file. The pointer-file + * check is what separates a real Iceberg table from any arbitrary folder that happens to have a + * sub-directory named {@code metadata} (a Spark checkpoint dir, a documentation folder, custom + * user layouts, etc.) — that name alone is too generic to be a reliable marker. + * + *

The validating LIST costs one extra storage call per candidate directory during discovery + * (not per upload cycle). Discovery already short-circuits recursion on a match, so the extra + * LIST runs at most once per real table and once per false-positive candidate during a discovery + * pass. + */ +public class IcebergTableFormatDetector implements TableFormatDetector { + private final AsyncStorageClient asyncStorageClient; + private final StorageUtils storageUtils; + + @Inject + public IcebergTableFormatDetector( + @Nonnull @TableDiscoveryObjectStorageAsyncClient AsyncStorageClient asyncStorageClient, + @Nonnull StorageUtils storageUtils) { + this.asyncStorageClient = asyncStorageClient; + this.storageUtils = storageUtils; + } + + @Override + public TableFormat format() { + return TableFormat.ICEBERG; + } + + @Override + public CompletableFuture matches(String path, List listedFiles) { + // S3 ListObjectsV2 surfaces "directories" as CommonPrefixes, which the storage client + // maps to File objects whose filename retains the trailing slash (e.g. "metadata/"), + // since that is exactly the Prefix string S3 returns. Strip a trailing slash before + // comparing so the check works regardless of whether the client preserves it. + boolean hasMetadataDir = + listedFiles.stream() + .anyMatch( + file -> { + if (!file.isDirectory()) { + return false; + } + String name = file.getFilename(); + if (name == null) { + return false; + } + if (name.endsWith("/")) { + name = name.substring(0, name.length() - 1); + } + return ICEBERG_METADATA_FOLDER_NAME.equals(name); + }); + if (!hasMetadataDir) { + return CompletableFuture.completedFuture(false); + } + String metadataDirUri = storageUtils.constructFileUri(path, ICEBERG_METADATA_FOLDER_NAME); + return asyncStorageClient + .listAllFilesInDir(metadataDirUri) + .thenApply( + metadataFiles -> + metadataFiles.stream() + .anyMatch( + f -> + !f.isDirectory() + && f.getFilename().endsWith(ICEBERG_METADATA_FILE_SUFFIX))); + } +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java index 536f99fe..0c1e7e71 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJob.java @@ -1,5 +1,6 @@ package ai.onehouse.metadata_extractor; +import ai.onehouse.api.models.request.TableFormat; import ai.onehouse.config.models.configv1.MetadataExtractorConfig; import ai.onehouse.constants.MetricsConstants; import ai.onehouse.storage.AsyncStorageClient; @@ -18,13 +19,17 @@ import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -34,6 +39,7 @@ public class TableDiscoveryAndUploadJob { private final TableDiscoveryService tableDiscoveryService; private final TableMetadataUploaderService tableMetadataUploaderService; + private final IcebergMetadataUploaderService icebergMetadataUploaderService; private final ScheduledExecutorService scheduler; private final Object lock = new Object(); private final LakeViewExtractorMetrics hudiMetadataExtractorMetrics; @@ -47,16 +53,41 @@ public class TableDiscoveryAndUploadJob { public TableDiscoveryAndUploadJob( @Nonnull TableDiscoveryService tableDiscoveryService, @Nonnull TableMetadataUploaderService tableMetadataUploaderService, + @Nonnull IcebergMetadataUploaderService icebergMetadataUploaderService, @Nonnull LakeViewExtractorMetrics hudiMetadataExtractorMetrics, @Nonnull @TableDiscoveryObjectStorageAsyncClient AsyncStorageClient asyncStorageClient) { this.scheduler = getScheduler(); this.tableDiscoveryService = tableDiscoveryService; this.tableMetadataUploaderService = tableMetadataUploaderService; + this.icebergMetadataUploaderService = icebergMetadataUploaderService; this.hudiMetadataExtractorMetrics = hudiMetadataExtractorMetrics; this.firstCronRunStartTime = Instant.now(); this.asyncStorageClient = asyncStorageClient; } + /** + * Routes discovered tables to the appropriate per-format uploader and reduces to a single + * success boolean (all-or-nothing). Hudi and Iceberg tables are uploaded concurrently. + */ + private CompletableFuture dispatchUpload(Set

tables) { + Map> byFormat = + tables.stream() + .collect( + Collectors.groupingBy( + t -> t.getTableFormat() == null ? TableFormat.HUDI : t.getTableFormat(), + Collectors.toSet())); + CompletableFuture hudiFuture = + tableMetadataUploaderService.uploadInstantsInTables( + byFormat.getOrDefault(TableFormat.HUDI, Collections.emptySet())); + CompletableFuture icebergFuture = + icebergMetadataUploaderService.uploadInstantsInTables( + byFormat.getOrDefault(TableFormat.ICEBERG, Collections.emptySet())); + // Treat a null result the same as success — the legacy contract on the Hudi uploader was + // "throw to fail," and downstream callers only inspect exceptions, not the boolean. + return hudiFuture.thenCombine( + icebergFuture, (a, b) -> !Boolean.FALSE.equals(a) && !Boolean.FALSE.equals(b)); + } + /* * runs discovery and upload periodically at fixed intervals in a continuous fashion */ @@ -96,7 +127,7 @@ public void runOnce(Config config, int runCounter) { Boolean isSucceeded = tableDiscoveryService .discoverTables() - .thenCompose(tableMetadataUploaderService::uploadInstantsInTables) + .thenCompose(this::dispatchUpload) .join(); if (Boolean.TRUE.equals(isSucceeded)) { log.info("Run Completed"); @@ -178,8 +209,7 @@ private void processTables(Config config) { log.debug("Uploading table metadata for discovered tables"); hudiMetadataExtractorMetrics.resetTableProcessedGauge(); AtomicBoolean hasError = new AtomicBoolean(false); - tableMetadataUploaderService - .uploadInstantsInTables(tables) + dispatchUpload(tables) .exceptionally( ex -> { log.error("Error uploading instants in tables: ", ex); diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java index 707ed4af..f5f557ea 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableDiscoveryService.java @@ -1,24 +1,28 @@ package ai.onehouse.metadata_extractor; -import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME; import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason; import static java.util.Collections.emptySet; -import com.google.inject.Inject; -import ai.onehouse.constants.MetricsConstants; +import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; +import ai.onehouse.api.models.request.TableFormat; import ai.onehouse.config.ConfigProvider; import ai.onehouse.config.models.configv1.Database; import ai.onehouse.config.models.configv1.MetadataExtractorConfig; import ai.onehouse.config.models.configv1.ParserConfig; +import ai.onehouse.config.models.configv1.TableHint; +import ai.onehouse.constants.MetricsConstants; import ai.onehouse.metadata_extractor.models.Table; import ai.onehouse.metrics.LakeViewExtractorMetrics; import ai.onehouse.storage.AsyncStorageClient; import ai.onehouse.storage.StorageUtils; import ai.onehouse.storage.models.File; -import ai.onehouse.RuntimeModule.TableDiscoveryObjectStorageAsyncClient; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +34,10 @@ import org.apache.commons.lang3.tuple.Pair; /* - * Discovers hudi tables by Parsing all folders (including nested folders) in provided base paths - * excluded paths will be skipped. + * Discovers tables by walking all folders (including nested folders) in provided base paths. + * Each {@link Database} in the parser YAML declares its {@code tableFormat}; this service picks + * the single {@link TableFormatDetector} matching that format and applies it to every directory + * under the database's base paths. Excluded paths are skipped. */ @Slf4j public class TableDiscoveryService { @@ -41,6 +47,7 @@ public class TableDiscoveryService { private final ExecutorService executorService; private final ConfigProvider configProvider; private final LakeViewExtractorMetrics lakeviewExtractorMetrics; + private final Map detectorsByFormat; @Inject public TableDiscoveryService( @@ -48,12 +55,18 @@ public TableDiscoveryService( @Nonnull StorageUtils storageUtils, @Nonnull ConfigProvider configProvider, @Nonnull ExecutorService executorService, - @Nonnull LakeViewExtractorMetrics lakeviewExtractorMetrics) { + @Nonnull LakeViewExtractorMetrics lakeviewExtractorMetrics, + @Nonnull HudiTableFormatDetector hudiTableFormatDetector, + @Nonnull IcebergTableFormatDetector icebergTableFormatDetector) { this.asyncStorageClient = asyncStorageClient; this.storageUtils = storageUtils; this.executorService = executorService; this.configProvider = configProvider; this.lakeviewExtractorMetrics = lakeviewExtractorMetrics; + this.detectorsByFormat = + ImmutableMap.of( + TableFormat.HUDI, hudiTableFormatDetector, + TableFormat.ICEBERG, icebergTableFormatDetector); } public CompletableFuture> discoverTables() { @@ -64,9 +77,29 @@ public CompletableFuture> discoverTables() { log.info("Starting table discover service, excluding {}", excludedPathPatterns); List>>> pathToDiscoveredTablesFuturePairList = new ArrayList<>(); + // Merge per-database tableHints into a single tableId -> hint map. Hints are optional metadata + // supplied by the control plane (e.g. Iceberg metadata_location) and are looked up after + // discovery, once the tableId is known. + Map tableHintsByTableId = new HashMap<>(); + for (ParserConfig parserConfig : metadataExtractorConfig.getParserConfig()) { + for (Database database : parserConfig.getDatabases()) { + if (database.getTableHints() == null) { + continue; + } + for (Map.Entry entry : database.getTableHints().entrySet()) { + TableHint previous = tableHintsByTableId.put(entry.getKey(), entry.getValue()); + if (previous != null) { + log.warn( + "Duplicate tableHint for tableId {} across databases; later entry wins.", + entry.getKey()); + } + } + } + } for (ParserConfig parserConfig : metadataExtractorConfig.getParserConfig()) { for (Database database : parserConfig.getDatabases()) { + TableFormatDetector detector = detectorFor(database); for (String basePathConfig : database.getBasePaths()) { String basePath = extractBasePath(basePathConfig); @@ -78,7 +111,11 @@ public CompletableFuture> discoverTables() { Pair.of( basePathConfig, discoverTablesInPath( - basePath, parserConfig.getLake(), database.getName(), excludedPathPatterns))); + basePath, + parserConfig.getLake(), + database.getName(), + excludedPathPatterns, + detector))); } } } @@ -106,7 +143,12 @@ public CompletableFuture> discoverTables() { continue; } Table table = discoveredTables.iterator().next(); - table = table.toBuilder().tableId(tableId).build(); + Table.TableBuilder builder = table.toBuilder().tableId(tableId); + TableHint hint = tableHintsByTableId.get(tableId); + if (hint != null && StringUtils.isNotBlank(hint.getMetadataLocationHint())) { + builder.metadataLocationHint(hint.getMetadataLocationHint()); + } + table = builder.build(); discoveredTables = Collections.singleton(table); } @@ -116,6 +158,16 @@ public CompletableFuture> discoverTables() { }); } + private TableFormatDetector detectorFor(Database database) { + TableFormat declared = database.getTableFormat() == null ? TableFormat.HUDI : database.getTableFormat(); + TableFormatDetector detector = detectorsByFormat.get(declared); + if (detector == null) { + // Programmer error: a new TableFormat enum value was added without a matching detector. + throw new IllegalStateException("No detector registered for tableFormat=" + declared); + } + return detector; + } + private String extractBasePath(String basePathConfig) { String[] basePathConfigParts = basePathConfig.split(TABLE_ID_SEPARATOR); return basePathConfigParts[0]; @@ -127,54 +179,68 @@ private String extractTableId(String basePathConfig) { } private CompletableFuture> discoverTablesInPath( - String path, String lakeName, String databaseName, List excludedPathPatterns) { + String path, + String lakeName, + String databaseName, + List excludedPathPatterns, + TableFormatDetector detector) { try { log.info(String.format("Discovering tables in %s", path)); return asyncStorageClient .listAllFilesInDir(path) .thenComposeAsync( - listedFiles -> { - Set
tablePaths = ConcurrentHashMap.newKeySet(); - List> recursiveFutures = new ArrayList<>(); - - if (isHudiTableFolder(listedFiles)) { - Table table = - Table.builder() - .absoluteTableUri(path) - .databaseName(databaseName) - .lakeName(lakeName) - .build(); - if (!isExcluded(table.getAbsoluteTableUri(), excludedPathPatterns)) { - tablePaths.add(table); - } - return CompletableFuture.completedFuture(tablePaths); - } - - List directories = - listedFiles.stream().filter(File::isDirectory).collect(Collectors.toList()); - - for (File file : directories) { - String filePath = storageUtils.constructFileUri(path, file.getFilename()); - if (!isExcluded(filePath, excludedPathPatterns)) { - CompletableFuture recursiveFuture = - discoverTablesInPath(filePath, lakeName, databaseName, excludedPathPatterns) - .thenAccept(tablePaths::addAll); - recursiveFutures.add(recursiveFuture); - } - } - - return CompletableFuture.allOf(recursiveFutures.toArray(new CompletableFuture[0])) - .thenApplyAsync(ignored -> tablePaths, executorService); - }, + listedFiles -> + detector + .matches(path, listedFiles) + .thenComposeAsync( + matched -> { + Set
tablePaths = ConcurrentHashMap.newKeySet(); + if (Boolean.TRUE.equals(matched)) { + Table table = + Table.builder() + .absoluteTableUri(path) + .databaseName(databaseName) + .lakeName(lakeName) + .tableFormat(detector.format()) + .build(); + if (!isExcluded(table.getAbsoluteTableUri(), excludedPathPatterns)) { + tablePaths.add(table); + } + return CompletableFuture.completedFuture(tablePaths); + } + + List directories = + listedFiles.stream() + .filter(File::isDirectory) + .collect(Collectors.toList()); + List> recursiveFutures = new ArrayList<>(); + for (File file : directories) { + String filePath = + storageUtils.constructFileUri(path, file.getFilename()); + if (!isExcluded(filePath, excludedPathPatterns)) { + CompletableFuture recursiveFuture = + discoverTablesInPath( + filePath, + lakeName, + databaseName, + excludedPathPatterns, + detector) + .thenAccept(tablePaths::addAll); + recursiveFutures.add(recursiveFuture); + } + } + return CompletableFuture.allOf( + recursiveFutures.toArray(new CompletableFuture[0])) + .thenApplyAsync(ignored -> tablePaths, executorService); + }, + executorService), executorService) .exceptionally( e -> { log.error("Failed to discover tables in path: {}", path, e); lakeviewExtractorMetrics.incrementTableDiscoveryFailureCounter( - getMetadataExtractorFailureReason( - e, - MetricsConstants.MetadataUploadFailureReasons.UNKNOWN) - ); + getMetadataExtractorFailureReason( + e, MetricsConstants.MetadataUploadFailureReasons.UNKNOWN)); return emptySet(); }); } catch (Exception e) { @@ -183,14 +249,6 @@ private CompletableFuture> discoverTablesInPath( } } - /* - * checks the contents of a folder to see if it is a hudi table or not - * a folder is a hudi table if it contains .hoodie folder within it - */ - private static boolean isHudiTableFolder(List listedFiles) { - return listedFiles.stream().anyMatch(file -> file.getFilename().startsWith(HOODIE_FOLDER_NAME)); - } - private boolean isExcluded(String filePath, List excludedPathPatterns) { return excludedPathPatterns.stream().anyMatch(filePath::matches); } diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java new file mode 100644 index 00000000..3bb4e696 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/TableFormatDetector.java @@ -0,0 +1,31 @@ +package ai.onehouse.metadata_extractor; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.models.File; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Identifies whether a directory listing represents the root of a table of a particular format. + * + *

One implementation per supported format (Hudi, Iceberg, ...). Each {@code Database} in the + * parser YAML declares its {@code tableFormat}, and {@link TableDiscoveryService} picks the + * single matching detector for that database — detectors are not raced against each other, so + * there is no ordering trap. + * + *

{@link #matches} returns a {@link CompletableFuture} because some detectors need a follow-up + * directory listing to confirm a match (e.g. Iceberg's "must have a {@code *.metadata.json} + * inside {@code metadata/}" rule). Stateless detectors should return {@link + * CompletableFuture#completedFuture}. + */ +public interface TableFormatDetector { + /** Format this detector identifies. */ + TableFormat format(); + + /** + * Returns true if the directory at {@code path} (with its top-level listing already in {@code + * listedFiles}) is a table root of {@link #format()}. Implementations may issue additional + * storage I/O to validate. + */ + CompletableFuture matches(String path, List listedFiles); +} diff --git a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java index ec918e69..4c6b1bcb 100644 --- a/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java +++ b/lakeview/src/main/java/ai/onehouse/metadata_extractor/models/Table.java @@ -3,6 +3,7 @@ import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT; import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT; +import ai.onehouse.api.models.request.TableFormat; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -20,4 +21,13 @@ public class Table { private String tableId; @Builder.Default private final int tableVersion = HOODIE_TABLE_VERSION_DEFAULT; @Builder.Default private final int timelineLayoutVersion = TIMELINE_LAYOUT_VERSION_DEFAULT; + // tableVersion / timelineLayoutVersion above are Hudi-specific and ignored for non-Hudi formats. + @Builder.Default private final TableFormat tableFormat = TableFormat.HUDI; + /** + * Optional URI of the current Iceberg {@code metadata.json}, supplied by the control plane via + * the parser YAML's table hints. Null when no hint was provided. {@link + * ai.onehouse.metadata_extractor.IcebergMetadataUploaderService} uses this to skip the per-cycle + * {@code metadata/} folder LIST when the hint matches the last uploaded file. + */ + private final String metadataLocationHint; } diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java index 7233b581..bbc25a8f 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryAndUploadJobTest.java @@ -40,6 +40,7 @@ class TableDiscoveryAndUploadJobTest { @Mock private TableDiscoveryService mockTableDiscoveryService; @Mock private TableMetadataUploaderService mockTableMetadataUploaderService; + @Mock private IcebergMetadataUploaderService mockIcebergMetadataUploaderService; @Mock private ScheduledExecutorService mockScheduler; @@ -56,6 +57,12 @@ class TableDiscoveryAndUploadJobTest { @BeforeEach void setUp(TestInfo info) { + // Existing tests exercise the Hudi path; dispatch routes any Iceberg-format tables (none in + // these tests) to the Iceberg uploader. Default the mock to a successful no-op so the dispatch + // combine doesn't NPE on the (empty Iceberg set) branch. + lenient() + .when(mockIcebergMetadataUploaderService.uploadInstantsInTables(anySet())) + .thenReturn(CompletableFuture.completedFuture(true)); Instant fixedInstant = info.getDisplayName().startsWith("2023") ? Instant.parse(info.getDisplayName()) : Instant.now(); try (MockedStatic mockedInstant = @@ -65,6 +72,7 @@ void setUp(TestInfo info) { new TableDiscoveryAndUploadJob( mockTableDiscoveryService, mockTableMetadataUploaderService, + mockIcebergMetadataUploaderService, mockHudiMetadataExtractorMetrics, asyncStorageClient) { @Override diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java index 04c6e0ae..65570dc9 100644 --- a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TableDiscoveryServiceTest.java @@ -164,7 +164,9 @@ void testDiscoverTablesWithExclusion() throws ExecutionException, InterruptedExc new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); Set

tableSet = tableDiscoveryService.discoverTables().get(); List
expectedResponseSet = @@ -256,7 +258,9 @@ void testCaseWhereMoreThanOneDiscoveredTablesForTableId() { new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); Set
discoveredTables = tableDiscoveryService.discoverTables().join(); assertEquals(emptySet(), discoveredTables); @@ -294,7 +298,9 @@ void testWithInvalidBasePath() { new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(s3AsyncStorageClient, new StorageUtils())); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); } @@ -326,7 +332,9 @@ void testTableDiscoveryWithAsyncExceptions() { new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); } @@ -357,7 +365,9 @@ void testTableDiscoveryEncountersRateLimitException() { new StorageUtils(), new ConfigProvider(config), ForkJoinPool.commonPool(), - hudiMetadataExtractorMetrics); + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); assertEquals(emptySet(), tableDiscoveryService.discoverTables().join()); @@ -367,6 +377,72 @@ void testTableDiscoveryEncountersRateLimitException() { MetricsConstants.MetadataUploadFailureReasons.RATE_LIMITING); } + @Test + void testDiscoverIcebergTableByDeclaredFormat() { + // Database declared as ICEBERG. Two children: `orders/` is a real Iceberg table (metadata/ + // with a *.metadata.json), `docs/` is a false-positive shape (metadata/ but no .metadata.json + // inside) and must be skipped. + String basePath = "s3://bucket/iceberg_warehouse/"; + String ordersPath = basePath + "orders/"; + String ordersMetadata = ordersPath + "metadata"; + String docsPath = basePath + "docs/"; + String docsMetadata = docsPath + "metadata"; + + when(asyncStorageClient.listAllFilesInDir(basePath)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(generateFileObj("orders/", true), generateFileObj("docs/", true)))); + when(asyncStorageClient.listAllFilesInDir(ordersPath)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList( + generateFileObj("metadata", true), generateFileObj("data", true)))); + when(asyncStorageClient.listAllFilesInDir(ordersMetadata)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(generateFileObj("v1.metadata.json", false)))); + when(asyncStorageClient.listAllFilesInDir(docsPath)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(generateFileObj("metadata", true)))); + when(asyncStorageClient.listAllFilesInDir(docsMetadata)) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(generateFileObj("README.md", false)))); + + when(config.getMetadataExtractorConfig()).thenReturn(metadataExtractorConfig); + when(metadataExtractorConfig.getPathExclusionPatterns()).thenReturn(Optional.of(emptyList())); + when(metadataExtractorConfig.getParserConfig()) + .thenReturn( + Collections.singletonList( + ParserConfig.builder() + .lake(LAKE) + .databases( + Collections.singletonList( + Database.builder() + .name(DATABASE) + .tableFormat(ai.onehouse.api.models.request.TableFormat.ICEBERG) + .basePaths(Collections.singletonList(basePath)) + .build())) + .build())); + + tableDiscoveryService = + new TableDiscoveryService( + asyncStorageClient, + new StorageUtils(), + new ConfigProvider(config), + ForkJoinPool.commonPool(), + hudiMetadataExtractorMetrics, + new HudiTableFormatDetector(), + new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils())); + + Set
tables = tableDiscoveryService.discoverTables().join(); + assertEquals(1, tables.size()); + Table only = tables.iterator().next(); + assertEquals(ordersPath, only.getAbsoluteTableUri()); + assertEquals(ai.onehouse.api.models.request.TableFormat.ICEBERG, only.getTableFormat()); + } + private File generateFileObj(String fileName, boolean isDirectory) { return File.builder() .filename(fileName) diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java new file mode 100644 index 00000000..cc313a45 --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestHudiTableFormatDetector.java @@ -0,0 +1,52 @@ +package ai.onehouse.metadata_extractor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.models.File; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class TestHudiTableFormatDetector { + private static final String ANY_PATH = "s3://bucket/db/t/"; + private final HudiTableFormatDetector detector = new HudiTableFormatDetector(); + + @Test + void declaresHudiFormat() { + assertEquals(TableFormat.HUDI, detector.format()); + } + + @Test + void matchesWhenHoodieFolderPresent() { + assertTrue( + detector + .matches( + ANY_PATH, + Arrays.asList(file(".hoodie", true), file("part-0.parquet", false))) + .join()); + } + + @Test + void doesNotMatchWhenAbsent() { + assertFalse( + detector + .matches(ANY_PATH, Collections.singletonList(file("part-0.parquet", false))) + .join()); + } + + @Test + void doesNotMatchOnIcebergLayout() { + assertFalse( + detector + .matches(ANY_PATH, Arrays.asList(file("metadata", true), file("data", true))) + .join()); + } + + private static File file(String name, boolean dir) { + return File.builder().filename(name).isDirectory(dir).lastModifiedAt(Instant.EPOCH).build(); + } +} diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java new file mode 100644 index 00000000..fbe12346 --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergMetadataUploaderService.java @@ -0,0 +1,128 @@ +package ai.onehouse.metadata_extractor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import ai.onehouse.storage.models.File; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +class TestIcebergMetadataUploaderService { + + @Test + void picksLatestHiveGlueStyleMetadataJsonByNumericPrefix() { + Optional latest = + IcebergMetadataUploaderService.pickLatestMetadataJson( + Arrays.asList( + jsonFile("00000-abc.metadata.json"), + jsonFile("00020-xyz.metadata.json"), + jsonFile("00005-def.metadata.json"))); + assertTrue(latest.isPresent()); + assertEquals("00020-xyz.metadata.json", latest.get().getFilename()); + } + + @Test + void picksLatestHadoopCatalogMetadataJsonByVersionNumber() { + // v10 is numerically newer than v2 — lex-sort would pick v2, which would be wrong. Numeric + // comparison on the leading integer correctly returns v10. + Optional latest = + IcebergMetadataUploaderService.pickLatestMetadataJson( + Arrays.asList( + jsonFile("v1.metadata.json"), + jsonFile("v2.metadata.json"), + jsonFile("v9.metadata.json"), + jsonFile("v10.metadata.json"), + jsonFile("v11.metadata.json"))); + assertTrue(latest.isPresent()); + assertEquals("v11.metadata.json", latest.get().getFilename()); + } + + @Test + void ignoresDirectoriesAndUnrelatedFiles() { + Optional latest = + IcebergMetadataUploaderService.pickLatestMetadataJson( + Arrays.asList( + File.builder().filename("snap-x.avro").isDirectory(false).lastModifiedAt(Instant.EPOCH).build(), + File.builder().filename("sub").isDirectory(true).lastModifiedAt(Instant.EPOCH).build(), + jsonFile("00001-a.metadata.json"))); + assertTrue(latest.isPresent()); + assertEquals("00001-a.metadata.json", latest.get().getFilename()); + } + + @Test + void returnsEmptyWhenNoMetadataJson() { + assertFalse( + IcebergMetadataUploaderService.pickLatestMetadataJson( + Collections.singletonList( + File.builder().filename("snap.avro").isDirectory(false).lastModifiedAt(Instant.EPOCH).build())) + .isPresent()); + } + + @Test + void extractVersionNumberHandlesAllNamingShapes() { + assertEquals(10L, IcebergMetadataUploaderService.extractVersionNumber("v10.metadata.json")); + assertEquals(20L, IcebergMetadataUploaderService.extractVersionNumber("00020-uuid.metadata.json")); + assertEquals(0L, IcebergMetadataUploaderService.extractVersionNumber("00000-uuid.metadata.json")); + // Non-numeric filenames still parse to -1 so they don't dominate the sort. + assertEquals(-1L, IcebergMetadataUploaderService.extractVersionNumber("metadata.json")); + assertEquals(-1L, IcebergMetadataUploaderService.extractVersionNumber("v.metadata.json")); + } + + @Test + void resolveFromVersionHintReturnsTargetWhenPresent() { + Optional resolved = + IcebergMetadataUploaderService.resolveFromVersionHint( + "7\n".getBytes(StandardCharsets.UTF_8), + Arrays.asList( + jsonFile("v6.metadata.json"), + jsonFile("v7.metadata.json"), + jsonFile("v8.metadata.json"))); + assertTrue(resolved.isPresent()); + assertEquals("v7.metadata.json", resolved.get().getFilename()); + } + + @Test + void resolveFromVersionHintReturnsEmptyWhenTargetMissing() { + // Hint points at v42 but only v6 / v7 exist — caller will fall back to numeric sort. + assertFalse( + IcebergMetadataUploaderService.resolveFromVersionHint( + "42".getBytes(StandardCharsets.UTF_8), + Arrays.asList(jsonFile("v6.metadata.json"), jsonFile("v7.metadata.json"))) + .isPresent()); + } + + @Test + void resolveFromVersionHintReturnsEmptyOnNonIntegerContent() { + assertFalse( + IcebergMetadataUploaderService.resolveFromVersionHint( + "not-an-int".getBytes(StandardCharsets.UTF_8), + Collections.singletonList(jsonFile("v1.metadata.json"))) + .isPresent()); + } + + @Test + void lastPathSegmentExtractsFilenameFromUri() { + assertEquals( + "00042-uuid.metadata.json", + IcebergMetadataUploaderService.lastPathSegment( + "s3://bucket/db/t/metadata/00042-uuid.metadata.json")); + assertEquals( + "v3.metadata.json", + IcebergMetadataUploaderService.lastPathSegment( + "s3a://bucket/db/t/metadata/v3.metadata.json")); + } + + @Test + void lastPathSegmentReturnsInputWhenNoSlash() { + assertEquals("filename.json", IcebergMetadataUploaderService.lastPathSegment("filename.json")); + } + + private static File jsonFile(String name) { + return File.builder().filename(name).isDirectory(false).lastModifiedAt(Instant.EPOCH).build(); + } +} diff --git a/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java new file mode 100644 index 00000000..cd5665bc --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/metadata_extractor/TestIcebergTableFormatDetector.java @@ -0,0 +1,106 @@ +package ai.onehouse.metadata_extractor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import ai.onehouse.api.models.request.TableFormat; +import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.StorageUtils; +import ai.onehouse.storage.models.File; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TestIcebergTableFormatDetector { + private static final String TABLE_PATH = "s3://bucket/db/t"; + private static final String METADATA_PATH = "s3://bucket/db/t/metadata"; + + @Mock private AsyncStorageClient asyncStorageClient; + private IcebergTableFormatDetector detector; + + @BeforeEach + void setUp() { + detector = new IcebergTableFormatDetector(asyncStorageClient, new StorageUtils()); + } + + @Test + void declaresIcebergFormat() { + assertEquals(TableFormat.ICEBERG, detector.format()); + } + + @Test + void matchesWhenMetadataDirHasMetadataJson() { + when(asyncStorageClient.listAllFilesInDir(eq(METADATA_PATH))) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList( + file("v3.metadata.json", false), + file("snap-1.avro", false)))); + assertTrue( + detector + .matches(TABLE_PATH, Arrays.asList(file("metadata", true), file("data", true))) + .join()); + } + + @Test + void doesNotMatchWhenMetadataDirEmptyOfMetadataJson() { + // Real false-positive: a folder happens to have a `metadata/` subdir but no Iceberg pointer. + when(asyncStorageClient.listAllFilesInDir(eq(METADATA_PATH))) + .thenReturn( + CompletableFuture.completedFuture( + Arrays.asList(file("schema.csv", false), file("README.md", false)))); + assertFalse( + detector + .matches(TABLE_PATH, Arrays.asList(file("metadata", true), file("data", true))) + .join()); + } + + @Test + void doesNotMatchOnFileNamedMetadata() { + // A non-directory entry named "metadata" must not be confused with the metadata/ folder, and + // we must not issue the validating LIST when the marker isn't present. + assertFalse( + detector.matches(TABLE_PATH, Collections.singletonList(file("metadata", false))).join()); + verify(asyncStorageClient, never()).listAllFilesInDir(eq(METADATA_PATH)); + } + + @Test + void doesNotMatchOnHudiLayout() { + assertFalse( + detector + .matches(TABLE_PATH, Arrays.asList(file(".hoodie", true), file("part-0.parquet", false))) + .join()); + verify(asyncStorageClient, never()).listAllFilesInDir(eq(METADATA_PATH)); + } + + @Test + void matchesWhenMetadataDirEntryHasTrailingSlash() { + // S3 ListObjectsV2 returns CommonPrefixes as e.g. "metadata/" (with the trailing slash that + // S3 itself uses). The storage client may surface that filename verbatim, so the detector + // must accept both "metadata" and "metadata/" as the folder marker. + when(asyncStorageClient.listAllFilesInDir(eq(METADATA_PATH))) + .thenReturn( + CompletableFuture.completedFuture( + Collections.singletonList(file("00000-uuid.metadata.json", false)))); + assertTrue( + detector + .matches(TABLE_PATH, Collections.singletonList(file("metadata/", true))) + .join()); + } + + private static File file(String name, boolean dir) { + return File.builder().filename(name).isDirectory(dir).lastModifiedAt(Instant.EPOCH).build(); + } +}