Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import ai.onehouse.config.models.configv1.ParserConfig;
import ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher;
import ai.onehouse.metadata_extractor.HoodiePropertiesReader;
import ai.onehouse.metadata_extractor.HudiTableFormatDetector;
import ai.onehouse.metadata_extractor.IcebergMetadataUploaderService;
import ai.onehouse.metadata_extractor.IcebergTableFormatDetector;
import ai.onehouse.metadata_extractor.LSMTimelineManifestReader;
import ai.onehouse.metadata_extractor.TableDiscoveryAndUploadJob;
import ai.onehouse.metadata_extractor.TableDiscoveryService;
Expand Down Expand Up @@ -253,7 +256,9 @@ private TableDiscoveryAndUploadJob getTableDiscoveryAndUploadJob(@Nonnull Config
configProvider);

TableDiscoveryService tableDiscoveryService = new TableDiscoveryService(asyncStorageClient, storageUtils,
configProvider, executorService, lakeViewExtractorMetrics);
configProvider, executorService, lakeViewExtractorMetrics,
new HudiTableFormatDetector(),
new IcebergTableFormatDetector(asyncStorageClient, storageUtils));
HoodiePropertiesReader hoodiePropertiesReader = new HoodiePropertiesReader(asyncStorageClient,
lakeViewExtractorMetrics);
OnehouseApiClient onehouseApiClient = new OnehouseApiClient(asyncHttpClientWithRetry, config,
Expand All @@ -267,8 +272,11 @@ presignedUrlFileUploader, onehouseApiClient, storageUtils, executorService, new
lakeViewExtractorMetrics, config, lsmTimelineManifestReader);
TableMetadataUploaderService tableMetadataUploaderService = new TableMetadataUploaderService(hoodiePropertiesReader,
onehouseApiClient, timelineCommitInstantsUploader, lakeViewExtractorMetrics, executorService);
IcebergMetadataUploaderService icebergMetadataUploaderService = new IcebergMetadataUploaderService(
asyncStorageClient, onehouseApiClient, presignedUrlFileUploader, storageUtils, lakeViewExtractorMetrics);

return new TableDiscoveryAndUploadJob(tableDiscoveryService, tableMetadataUploaderService, lakeViewExtractorMetrics, asyncStorageClient);
return new TableDiscoveryAndUploadJob(tableDiscoveryService, tableMetadataUploaderService,
icebergMetadataUploaderService, lakeViewExtractorMetrics, asyncStorageClient);
}

private AsyncStorageClient getAsyncStorageClient(@Nonnull Config config, @Nonnull ExecutorService executorService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ public class InitializeTableMetricsCheckpointRequest {
public static class InitializeSingleTableMetricsCheckpointRequest {
@NonNull String tableId;
@NonNull String tableName;
// For Hudi: source of truth for COW vs MOR. For non-Hudi formats it is a meaningless
// placeholder; the server discriminates on tableFormat first. Kept non-null to preserve
// the existing wire contract; defaults to COPY_ON_WRITE for Iceberg tables.
@NonNull TableType tableType;
// Physical table format. Absent is interpreted by the server as HUDI for backward compat
// with extractors that haven't been updated.
@Nullable TableFormat tableFormat;
String lakeName;
String databaseName;
String tableBasePath;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ai.onehouse.api.models.request;

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Physical observed-table format. Wire-serialized to the protobuf enum names defined in
* lake/table.proto's {@code TableFormat} so the control plane's protobuf-java-util JSON parser
* accepts the value on RPCs like {@code InitializeTableMetricsCheckpoint}. Without these
* {@code @JsonProperty} aliases, Jackson would emit the short Java identifier ({@code "ICEBERG"})
* and the server-side parser — which requires the canonical proto enum name
* ({@code "TABLE_FORMAT_ICEBERG"}) — would silently drop the value and persist the checkpoint
* with the proto enum-zero default ({@code TABLE_FORMAT_INVALID}), routing iceberg uploads
* through the Hudi back-compat fallback.
*/
public enum TableFormat {
@JsonProperty("TABLE_FORMAT_HUDI")
HUDI,
@JsonProperty("TABLE_FORMAT_ICEBERG")
ICEBERG
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package ai.onehouse.config.models.configv1;

import ai.onehouse.api.models.request.TableFormat;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
Expand All @@ -12,4 +15,19 @@
public class Database {
String name;
@NonNull List<String> basePaths;
/**
* Physical table format of the tables under {@link #basePaths}. Null is interpreted as {@link
* TableFormat#HUDI} for backward compatibility with YAMLs written before Iceberg support
* existed. Customers with a mixed-format warehouse should split into separate {@link Database}
* entries, one per format.
*/
@Nullable TableFormat tableFormat;
/**
* Optional per-table hints keyed on the tableId encoded in {@link #basePaths} (the segment after
* {@code #}). The control plane populates this for tables it has richer information about
* (e.g. Iceberg tables discovered via AWS Glue, where the catalog already knows the current
* {@code metadata.json} URI). Missing entries / missing map are normal — older YAML versions
* predate this field, and tables without hints fall back to plain discovery.
*/
@Nullable Map<String, TableHint> tableHints;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ai.onehouse.config.models.configv1;

import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;

/**
* Optional per-table hints supplied by the control plane in the parser YAML, keyed on tableId in
* {@link Database#getTableHints()}. Hints are forward-compatible: older LakeView versions that
* don't know about a field deserialize the rest fine; new fields default to null.
*/
@Builder
@Value
@Jacksonized
public class TableHint {
/**
* For Iceberg tables registered in an external catalog (e.g. AWS Glue), the URI of the current
* {@code metadata.json} that the catalog points at. When present, the extractor can PUT this
* file directly instead of listing the {@code metadata/} folder to find the latest. Empty /
* null means "no hint — fall back to listing".
*/
@Nullable String metadataLocationHint;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ private MetadataExtractorConstants() {}

public static final String HOODIE_FOLDER_NAME = ".hoodie";
public static final String ARCHIVED_FOLDER_NAME = "archived";
public static final String ICEBERG_METADATA_FOLDER_NAME = "metadata";
public static final String ICEBERG_METADATA_FILE_SUFFIX = ".metadata.json";
public static final String ICEBERG_VERSION_HINT_FILENAME = "version-hint.text";
public static final String ICEBERG_HADOOP_METADATA_FILE_PREFIX = "v";
public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";
public static final String HOODIE_TABLE_TYPE_KEY = "hoodie.table.type";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ai.onehouse.metadata_extractor;

import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_FOLDER_NAME;

import ai.onehouse.api.models.request.TableFormat;
import ai.onehouse.storage.models.File;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/** A directory is a Hudi table root if it contains a {@code .hoodie/} folder. */
public class HudiTableFormatDetector implements TableFormatDetector {
@Override
public TableFormat format() {
return TableFormat.HUDI;
}

@Override
public CompletableFuture<Boolean> matches(String path, List<File> listedFiles) {
return CompletableFuture.completedFuture(
listedFiles.stream().anyMatch(file -> file.getFilename().startsWith(HOODIE_FOLDER_NAME)));
}
}
Loading
Loading