Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ai.onehouse.config.models.configv1.ParserConfig;
import ai.onehouse.metadata_extractor.ActiveTimelineInstantBatcher;
import ai.onehouse.metadata_extractor.HoodiePropertiesReader;
import ai.onehouse.metadata_extractor.LSMTimelineManifestReader;
import ai.onehouse.metadata_extractor.TableDiscoveryAndUploadJob;
import ai.onehouse.metadata_extractor.TableDiscoveryService;
import ai.onehouse.metadata_extractor.TableMetadataUploaderService;
Expand Down Expand Up @@ -259,9 +260,11 @@ private TableDiscoveryAndUploadJob getTableDiscoveryAndUploadJob(@Nonnull Config
lakeViewExtractorMetrics);
PresignedUrlFileUploader presignedUrlFileUploader = new PresignedUrlFileUploader(asyncStorageClient,
asyncHttpClientWithRetry, lakeViewExtractorMetrics);
LSMTimelineManifestReader lsmTimelineManifestReader = new LSMTimelineManifestReader(asyncStorageClient,
storageUtils);
TimelineCommitInstantsUploader timelineCommitInstantsUploader = new TimelineCommitInstantsUploader(asyncStorageClient,
presignedUrlFileUploader, onehouseApiClient, storageUtils, executorService, new ActiveTimelineInstantBatcher(config),
lakeViewExtractorMetrics, config);
lakeViewExtractorMetrics, config, lsmTimelineManifestReader);
TableMetadataUploaderService tableMetadataUploaderService = new TableMetadataUploaderService(hoodiePropertiesReader,
onehouseApiClient, timelineCommitInstantsUploader, lakeViewExtractorMetrics, executorService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ private MetadataExtractorConstants() {}
public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";
public static final String HOODIE_TABLE_TYPE_KEY = "hoodie.table.type";
public static final String HOODIE_TABLE_VERSION_KEY = "hoodie.table.version";
public static final String HOODIE_TIMELINE_LAYOUT_VERSION_KEY =
"hoodie.timeline.layout.version";
public static final String TIMELINE_FOLDER_NAME = "timeline";
public static final String HISTORY_FOLDER_NAME = "history";
public static final int TIMELINE_LAYOUT_VERSION_V1 = 1;
public static final int TIMELINE_LAYOUT_VERSION_V2 = 2;
public static final int HOODIE_TABLE_VERSION_DEFAULT = 6;
public static final int TIMELINE_LAYOUT_VERSION_DEFAULT = TIMELINE_LAYOUT_VERSION_V1;
public static final String MANIFEST_FILE_PREFIX = "manifest_";

// The default number of instants in one archived commit metadata file is 10
// so we want to ingest 10x active instants than archived instants in one batch
Expand All @@ -39,17 +49,23 @@ private MetadataExtractorConstants() {}
// Default batch size will be 5 MB
public static final int DEFAULT_FILE_UPLOAD_STREAM_BATCH_SIZE =
Integer.parseInt(System.getenv().getOrDefault("FILE_UPLOAD_STREAM_BATCH_SIZE", "5242880"));
public static final String VERSION_MARKER_FILE = "_version_";
public static final Pattern ARCHIVED_COMMIT_INSTANT_PATTERN =
Pattern.compile("\\.commits_\\.archive\\.\\d+_\\d+-\\d+-\\d+");
public static final Pattern ARCHIVED_COMMIT_INSTANT_PATTERN_V2 =
Pattern.compile("\\d+_\\d+_\\d+\\.parquet|manifest_\\d+|" + VERSION_MARKER_FILE);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is too broad. Can we add ^ and $ at begin and end for each pattern ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These patterns are used exclusively with .matches(), which implicitly anchors the match to the entire string (equivalent to ^pattern$). Adding ^ and $ would be redundant. The V2-specific extraction patterns (V2_ARCHIVED_PARQUET_TIMESTAMP_PATTERN and V2_MANIFEST_NUMERIC_PATTERN) already have anchors since they're used with .find().

public static final Pattern ACTIVE_COMMIT_INSTANT_PATTERN =
Pattern.compile("\\d+(\\.[a-z]{1,20}){1,2}");
Pattern.compile("\\d+(_\\d+)?(\\.[a-z]{1,20}){1,2}");
public static final Pattern V1_ARCHIVED_NUMERIC_PATTERN =
Pattern.compile("\\.archive\\.(\\d+)_");
public static final Checkpoint INITIAL_CHECKPOINT =
Checkpoint.builder()
.batchId(0)
.checkpointTimestamp(Instant.EPOCH)
.lastUploadedFile("")
.firstIncompleteCommitFile("")
.archivedCommitsProcessed(false)
.lastArchivedManifestVersion(0)
.build();

// hardcoding last modified at to prevent this from causing issues with our checkpoint logic
Expand All @@ -73,5 +89,7 @@ private MetadataExtractorConstants() {}
"restore",
"clean",
"compaction",
"replacecommit");
"replacecommit",
"clustering",
"logcompaction");
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ private List<File> sortAndFilterInstants(List<File> instants) {
private List<File> sortAndFilterInstants(List<File> instants, Instant lastModifiedFilter) {
return instants.stream()
.filter(this::filterFile)
.collect(Collectors.groupingBy(file -> file.getFilename().split("\\.", 3)[0]))
.collect(Collectors.groupingBy(file -> {
String rawKey = file.getFilename().split("\\.", 3)[0];
return rawKey.contains("_") ? rawKey.split("_")[0] : rawKey;
}))
.values()
.stream()
.filter(
Expand Down Expand Up @@ -264,6 +267,17 @@ static boolean areRelatedSavepointOrRollbackInstants(
static ActiveTimelineInstant getActiveTimeLineInstant(String instant) {
String[] parts = instant.split("\\.", 3);

// V9 completed instants embed completion time after an underscore in the leading token,
// e.g. "20260204053206256_20260204053210895". Split it out so callers see the request
// timestamp and the optional completion timestamp separately.
String timestamp = parts[0];
String completionTime = null;
if (timestamp.contains("_")) {
String[] tsParts = timestamp.split("_", 2);
timestamp = tsParts[0];
completionTime = tsParts[1];
}

String action;
String state;
// For commit action, metadata file in inflight state is in the format of XYZ.inflight
Expand All @@ -274,13 +288,21 @@ static ActiveTimelineInstant getActiveTimeLineInstant(String instant) {
action = parts[1];
state = parts.length == 3 ? parts[2] : "completed";
}
return ActiveTimelineInstant.builder().timestamp(parts[0]).action(action).state(state).build();
return ActiveTimelineInstant.builder()
.timestamp(timestamp)
.completionTime(completionTime)
.action(action)
.state(state)
.build();
}

@Builder
@Getter
static class ActiveTimelineInstant {
private final String timestamp;
// Only populated for V9 (table version >= 8) completed instants, which embed completion time
// alongside the request timestamp in the filename. Null for V1-V8 instants.
private final String completionTime;
private final String action;
private final String state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_NAME_KEY;
import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_TYPE_KEY;
import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_DEFAULT;
import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TABLE_VERSION_KEY;
import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_TIMELINE_LAYOUT_VERSION_KEY;
import static ai.onehouse.constants.MetadataExtractorConstants.TIMELINE_LAYOUT_VERSION_DEFAULT;
import static ai.onehouse.metadata_extractor.MetadataExtractorUtils.getMetadataExtractorFailureReason;

import com.google.inject.Inject;
Expand Down Expand Up @@ -42,9 +46,18 @@ public CompletableFuture<ParsedHudiProperties> readHoodieProperties(String path)
} catch (IOException e) {
throw new RuntimeException("Failed to load properties file", e);
}
int tableVersion = Integer.parseInt(
properties.getProperty(
HOODIE_TABLE_VERSION_KEY, String.valueOf(HOODIE_TABLE_VERSION_DEFAULT)));
int timelineLayoutVersion = Integer.parseInt(
properties.getProperty(
HOODIE_TIMELINE_LAYOUT_VERSION_KEY,
String.valueOf(TIMELINE_LAYOUT_VERSION_DEFAULT)));
return ParsedHudiProperties.builder()
.tableName(properties.getProperty(HOODIE_TABLE_NAME_KEY))
.tableType(TableType.valueOf(properties.getProperty(HOODIE_TABLE_TYPE_KEY)))
.tableVersion(tableVersion)
.timelineLayoutVersion(timelineLayoutVersion)
.build();
})
.exceptionally(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package ai.onehouse.metadata_extractor;

import static ai.onehouse.constants.MetadataExtractorConstants.MANIFEST_FILE_PREFIX;
import static ai.onehouse.constants.MetadataExtractorConstants.VERSION_MARKER_FILE;

import ai.onehouse.RuntimeModule.TableMetadataUploadObjectStorageAsyncClient;
import ai.onehouse.storage.AsyncStorageClient;
import ai.onehouse.storage.StorageUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

/**
* Reads the Hudi LSM (V2) archived timeline manifest layout that lives under {@code
* .hoodie/timeline/history/}. The layout, written by Hudi 1.x, is:
*
* <pre>
* _version_ # text file containing the latest manifest version
* manifest_N # JSON: { "files": [{ "fileName": "...", "fileLen": N }] }
* {minInstant}_{maxInstant}_{level}.parquet
* </pre>
*
* <p>The manifest is the source of truth for which parquet files are valid in the current
* snapshot. Compaction rewrites multiple low-level files into a single higher-level file, leaving
* orphaned data files on storage; only files referenced by the latest manifest should be treated
* as live. LakeView mirrors files to the backend rather than parsing them, so it only needs the
* filenames the manifest lists - no Hudi dependency required.
*/
@Slf4j
public class LSMTimelineManifestReader {
private final AsyncStorageClient asyncStorageClient;
private final StorageUtils storageUtils;
private final ObjectMapper mapper = new ObjectMapper();

@Inject
public LSMTimelineManifestReader(
@Nonnull @TableMetadataUploadObjectStorageAsyncClient AsyncStorageClient asyncStorageClient,
@Nonnull StorageUtils storageUtils) {
this.asyncStorageClient = asyncStorageClient;
this.storageUtils = storageUtils;
}

/**
* Reads {@code _version_} and the corresponding {@code manifest_N} from the given history
* directory. Returns {@link ManifestSnapshot#empty()} when the {@code _version_} file does not
* exist (no archives have been written yet).
*/
public CompletableFuture<ManifestSnapshot> readLatestManifest(String historyDirectoryUri) {
String versionFileUri = storageUtils.constructFileUri(historyDirectoryUri, VERSION_MARKER_FILE);
return asyncStorageClient
.readFileAsBytes(versionFileUri)
.exceptionally(
throwable -> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only treat as empty if version itself was missing.
A manifest read failure after a successful version read should propagate to trigger a retry on the next cycle.

// Only treat as empty when _version_ itself is missing (no archives written yet).
log.info(
"No V2 archived timeline _version_ file at {} (treating as empty). Reason: {}",
versionFileUri,
throwable.getMessage());
return null;
})
.thenCompose(
versionBytes -> {
if (versionBytes == null) {
return CompletableFuture.completedFuture(ManifestSnapshot.empty());
}
int version = parseVersionFile(versionBytes);
// Manifest read failures propagate to the caller so the upload is retried
// on the next sync cycle rather than silently treating a readable _version_
// as "no archives."
return readManifestForVersion(historyDirectoryUri, version)
.thenApply(files -> ManifestSnapshot.of(version, files));
});
}

/**
* Reads {@code manifest_N} for the given version and returns the parquet filenames it
* references. Returns an empty list if the manifest cannot be read - callers treat this as
* "previous snapshot is gone, fall back to bootstrap" rather than a hard failure.
*/
public CompletableFuture<List<String>> readManifestFileNames(
String historyDirectoryUri, int version) {
return readManifestForVersion(historyDirectoryUri, version)
.exceptionally(
throwable -> {
log.warn(
"Failed to read manifest_{} from {}; treating as missing. Reason: {}",
version,
historyDirectoryUri,
throwable.getMessage());
return Collections.emptyList();
});
}

private CompletableFuture<List<String>> readManifestForVersion(
String historyDirectoryUri, int version) {
String manifestUri =
storageUtils.constructFileUri(historyDirectoryUri, MANIFEST_FILE_PREFIX + version);
return asyncStorageClient
.readFileAsBytes(manifestUri)
.thenApply(
bytes -> {
try {
return parseManifestFileNames(bytes);
} catch (IOException e) {
throw new UncheckedIOException(
"Failed to parse LSM manifest at " + manifestUri, e);
}
});
}

private static int parseVersionFile(byte[] bytes) {
String contents = new String(bytes, StandardCharsets.UTF_8).trim();
return Integer.parseInt(contents);
}

private List<String> parseManifestFileNames(byte[] bytes) throws IOException {
JsonNode root = mapper.readTree(bytes);
JsonNode files = root.get("files");
List<String> result = new ArrayList<>();
if (files != null && files.isArray()) {
for (JsonNode entry : files) {
JsonNode name = entry.get("fileName");
if (name != null && !name.asText().isEmpty()) {
result.add(name.asText());
}
}
}
return result;
}

/** Snapshot of the latest LSM manifest: version number plus the parquet filenames it lists. */
@Value(staticConstructor = "of")
public static class ManifestSnapshot {
int version;
@Nonnull List<String> parquetFileNames;

public static ManifestSnapshot empty() {
return ManifestSnapshot.of(0, Collections.emptyList());
}

public boolean isEmpty() {
return version == 0;
}
}
}
Loading
Loading