> hudiTableSchemaOpt)
+ {
+ this.tableHandle = requireNonNull(tableHandle, "table is null");
this.dataColumns = requireNonNull(dataColumns, "dataColumns is null");
this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null");
this.tableParameters = requireNonNull(tableParameters, "tableParameters is null");
- this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
+ this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null");
+ this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null");
+ this.hudiTableSchemaOpt = requireNonNull(hudiTableSchemaOpt, "hudiTableSchemaOpt is null");
+ }
+
+ /**
+ * Builds a lazily-parsed Avro schema from the given schema string.
+ *
+ * Returns {@code Optional.empty()} if the input string is null/empty
+ * or if parsing the schema fails.
+ */
+ private static Optional> buildTableSchema(String tableSchemaStr)
+ {
+ if (StringUtils.isNullOrEmpty(tableSchemaStr)) {
+ return Optional.empty();
+ }
+
+ try {
+ Lazy lazySchema = Lazy.lazily(() -> new Schema.Parser().parse(tableSchemaStr));
+ return Optional.of(lazySchema);
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to parse table schema: %s", tableSchemaStr);
+ return Optional.empty();
+ }
}
@JsonProperty
- public HudiTableHandle getTable()
+ public HudiTableHandle getTableHandle()
{
- return table;
+ return tableHandle;
}
@JsonProperty
@@ -75,9 +121,30 @@ public Map getTableParameters()
}
@JsonProperty
- public TupleDomain getTupleDomain()
+ public TupleDomain getRegularPredicates()
+ {
+ return regularPredicates;
+ }
+
+ @JsonProperty
+ public TupleDomain getPartitionPredicates()
+ {
+ return partitionPredicates;
+ }
+
+ @JsonProperty
+ public String getTableSchemaStr()
+ {
+ return hudiTableSchemaOpt
+ .map(Lazy::get)
+ .map(Schema::toString)
+ .orElse("");
+ }
+
+ @JsonIgnore
+ public Schema getTableSchema()
{
- return tupleDomain;
+ return hudiTableSchemaOpt.map(Lazy::get).orElse(null);
}
@Override
@@ -90,19 +157,20 @@ public boolean equals(Object o)
return false;
}
HudiTableLayoutHandle that = (HudiTableLayoutHandle) o;
- return Objects.equals(table, that.table) &&
- Objects.equals(tupleDomain, that.tupleDomain);
+ return Objects.equals(tableHandle, that.tableHandle) &&
+ Objects.equals(regularPredicates, that.regularPredicates) &&
+ Objects.equals(partitionPredicates, that.partitionPredicates);
}
@Override
public int hashCode()
{
- return Objects.hash(table, tupleDomain);
+ return Objects.hash(tableHandle, regularPredicates, partitionPredicates);
}
@Override
public String toString()
{
- return table.toString();
+ return tableHandle.toString();
}
}
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/HiveStylePartitionValueExtractor.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/HiveStylePartitionValueExtractor.java
new file mode 100644
index 0000000000000..cffa3cce37239
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/HiveStylePartitionValueExtractor.java
@@ -0,0 +1,24 @@
+package com.facebook.presto.hudi.partition;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Extractor for Hive Style Partitioned tables, when the partition folders are key value pairs.
+ *
+ * This implementation extracts the partition value of yyyy-mm-dd from the path of type datestr=yyyy-mm-dd.
+ */
+public class HiveStylePartitionValueExtractor implements PartitionValueExtractor {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public List extractPartitionValuesInPath(String partitionPath) {
+ // partition path is expected to be in this format partition_key=partition_value.
+ String[] splits = partitionPath.split("=");
+ if (splits.length != 2) {
+ throw new IllegalArgumentException(
+ "Partition path " + partitionPath + " is not in the form partition_key=partition_value.");
+ }
+ return Collections.singletonList(splits[1]);
+ }
+}
\ No newline at end of file
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/MultiPartKeysValueExtractor.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/MultiPartKeysValueExtractor.java
new file mode 100644
index 0000000000000..9058fe4fd9f8f
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/MultiPartKeysValueExtractor.java
@@ -0,0 +1,32 @@
+package com.facebook.presto.hudi.partition;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Partition Key extractor treating each value delimited by slash as separate key.
+ */
+public class MultiPartKeysValueExtractor implements PartitionValueExtractor {
+
+ @Override
+ public List extractPartitionValuesInPath(String partitionPath) {
+ // If the partitionPath is empty string( which means none-partition table), the partition values
+ // should be empty list.
+ if (partitionPath.isEmpty()) {
+ return Collections.emptyList();
+ }
+ String[] splits = partitionPath.split("/");
+ return Arrays.stream(splits).map(s -> {
+ if (s.contains("=")) {
+ String[] moreSplit = s.split("=");
+ ValidationUtils.checkArgument(moreSplit.length == 2, "Partition Field (" + s + ") not in expected format");
+ return moreSplit[1];
+ }
+ return s;
+ }).collect(Collectors.toList());
+ }
+}
\ No newline at end of file
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/NonPartitionedExtractor.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/NonPartitionedExtractor.java
new file mode 100644
index 0000000000000..e4ddbd656eea3
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/NonPartitionedExtractor.java
@@ -0,0 +1,15 @@
+package com.facebook.presto.hudi.partition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Extractor for Non-partitioned hive tables.
+ */
+public class NonPartitionedExtractor implements PartitionValueExtractor {
+
+ @Override
+ public List extractPartitionValuesInPath(String partitionPath) {
+ return new ArrayList<>();
+ }
+}
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/PartitionValueExtractor.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/PartitionValueExtractor.java
new file mode 100644
index 0000000000000..fc0e0338a5da1
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/PartitionValueExtractor.java
@@ -0,0 +1,16 @@
+package com.facebook.presto.hudi.partition;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * HDFS Path contain hive partition values for the keys it is partitioned on. This mapping is not straight forward and
+ * requires a pluggable implementation to extract the partition value from HDFS path.
+ *
+ * e.g. Hive table partitioned by datestr=yyyy-mm-dd and hdfs path /app/hoodie/dataset1/YYYY=[yyyy]/MM=[mm]/DD=[dd]
+ */
+public interface PartitionValueExtractor extends Serializable
+{
+
+ List extractPartitionValuesInPath(String partitionPath);
+}
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/SinglePartPartitionValueExtractor.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/SinglePartPartitionValueExtractor.java
new file mode 100644
index 0000000000000..ced4b6e387575
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/partition/SinglePartPartitionValueExtractor.java
@@ -0,0 +1,19 @@
+package com.facebook.presto.hudi.partition;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Extractor for a partition path from a single column.
+ *
+ * This implementation extracts the partition value from the partition path as a single part
+ * even if the relative partition path contains slashes, e.g., the `TimestampBasedKeyGenerator`
+ * transforms the timestamp column into the partition path in the format of "yyyyMM/dd/HH".
+ * The slash (`/`) is replaced with dash (`-`), e.g., `202210/01/20` -> `202210-01-20`.
+ */
+public class SinglePartPartitionValueExtractor implements PartitionValueExtractor {
+ @Override
+ public List extractPartitionValuesInPath(String partitionPath) {
+ return Collections.singletonList(partitionPath.replace('/', '-'));
+ }
+}
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/query/HudiDirectoryLister.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/query/HudiDirectoryLister.java
new file mode 100644
index 0000000000000..3ffa424eee7d6
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/query/HudiDirectoryLister.java
@@ -0,0 +1,13 @@
+package com.facebook.presto.hudi.query;
+
+import com.facebook.presto.hudi.HudiPartition;
+import org.apache.hudi.common.model.FileSlice;
+
+import java.io.Closeable;
+import java.util.stream.Stream;
+
+public interface HudiDirectoryLister
+ extends Closeable
+{
+ Stream listStatus(HudiPartition hudiPartition, boolean useIndex);
+}
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/query/HudiSnapshotDirectoryLister.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/query/HudiSnapshotDirectoryLister.java
new file mode 100644
index 0000000000000..362811857f92e
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/query/HudiSnapshotDirectoryLister.java
@@ -0,0 +1,88 @@
+package com.facebook.presto.hudi.query;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.hudi.HudiPartition;
+import com.facebook.presto.hudi.HudiTableHandle;
+import com.facebook.presto.hudi.HudiTableLayoutHandle;
+import com.facebook.presto.hudi.query.index.HudiIndexSupport;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.SchemaTableName;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.util.Lazy;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.hudi.HudiSessionProperties.isHudiMetadataTableEnabled;
+import static com.facebook.presto.hudi.query.index.IndexSupportFactory.createIndexSupport;
+import static org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemViewWithTimeline;
+
+public class HudiSnapshotDirectoryLister
+ implements HudiDirectoryLister
+{
+ private static final Logger log = Logger.get(HudiSnapshotDirectoryLister.class);
+ private final HudiTableHandle tableHandle;
+ private final Lazy lazyFileSystemView;
+ private final Optional indexSupportOpt;
+
+ public HudiSnapshotDirectoryLister(
+ ConnectorSession session,
+ HudiTableLayoutHandle layoutHandle,
+ boolean enableMetadataTable,
+ Lazy lazyTableMetadata)
+ {
+ this.tableHandle = layoutHandle.getTableHandle();
+ SchemaTableName schemaTableName = tableHandle.getSchemaTableName();
+ this.lazyFileSystemView = Lazy.lazily(() -> {
+ HoodieTimer timer = HoodieTimer.start();
+ HoodieTableMetaClient metaClient = layoutHandle.getTableHandle().getMetaClient();
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(isHudiMetadataTableEnabled(session)).build();
+ HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
+ HoodieTableFileSystemView fsView = createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, metadataConfig, timeline);
+ log.info("Created file system view of table %s in %s ms", schemaTableName, timer.endTimer());
+ return fsView;
+ });
+
+ Lazy lazyMetaClient = Lazy.lazily(tableHandle::getMetaClient);
+ this.indexSupportOpt = enableMetadataTable ?
+ createIndexSupport(layoutHandle, lazyMetaClient, lazyTableMetadata, layoutHandle.getRegularPredicates(), session) : Optional.empty();
+ }
+
+ @Override
+ public Stream listStatus(HudiPartition hudiPartition, boolean useIndex)
+ {
+ HoodieTimer timer = HoodieTimer.start();
+ Stream slices = lazyFileSystemView.get().getLatestFileSlicesBeforeOrOn(
+ hudiPartition.getRelativePartitionPath(),
+ tableHandle.getLatestCommitTime(),
+ false);
+
+ if (!useIndex) {
+ return slices;
+ }
+
+ Stream fileSlices = slices
+ .filter(slice -> indexSupportOpt
+ .map(indexSupport -> !indexSupport.shouldSkipFileSlice(slice))
+ .orElse(true));
+ log.info("Listed partition [%s] on table %s.%s in %s ms",
+ hudiPartition, tableHandle.getSchemaName(), tableHandle.getTableName(), timer.endTimer());
+ return fileSlices;
+ }
+
+ @Override
+ public void close()
+ {
+ if (!lazyFileSystemView.get().isClosed()) {
+ lazyFileSystemView.get().close();
+ }
+ }
+}
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/query/index/HudiBaseIndexSupport.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/query/index/HudiBaseIndexSupport.java
new file mode 100644
index 0000000000000..3c2738996fc91
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/query/index/HudiBaseIndexSupport.java
@@ -0,0 +1,53 @@
+package com.facebook.presto.hudi.query.index;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.SchemaTableName;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.util.Lazy;
+
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public abstract class HudiBaseIndexSupport
+ implements HudiIndexSupport
+{
+ private final Logger log;
+ protected final SchemaTableName schemaTableName;
+ protected final Lazy lazyMetaClient;
+
+ public HudiBaseIndexSupport(Logger log, SchemaTableName schemaTableName, Lazy lazyMetaClient)
+ {
+ this.log = requireNonNull(log, "log is null");
+ this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
+ this.lazyMetaClient = requireNonNull(lazyMetaClient, "metaClient is null");
+ }
+
+ public void printDebugMessage(Map> candidateFileSlices, Map> inputFileSlices, long lookupDurationMs)
+ {
+ if (log.isDebugEnabled()) {
+ int candidateFileSize = candidateFileSlices.values().stream().mapToInt(List::size).sum();
+ int totalFiles = inputFileSlices.values().stream().mapToInt(List::size).sum();
+ double skippingPercent = totalFiles == 0 ? 0.0d : (totalFiles - candidateFileSize) / (totalFiles * 1.0d);
+
+ log.info("Total files: %s; files after data skipping: %s; skipping percent %s; time taken: %s ms; table name: %s",
+ totalFiles,
+ candidateFileSize,
+ skippingPercent,
+ lookupDurationMs,
+ schemaTableName);
+ }
+ }
+
+ protected Map getAllIndexDefinitions()
+ {
+ if (lazyMetaClient.get().getIndexMetadata().isEmpty()) {
+ return Map.of();
+ }
+
+ return lazyMetaClient.get().getIndexMetadata().get().getIndexDefinitions();
+ }
+}
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/query/index/HudiColumnStatsIndexSupport.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/query/index/HudiColumnStatsIndexSupport.java
new file mode 100644
index 0000000000000..b702772e20142
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/query/index/HudiColumnStatsIndexSupport.java
@@ -0,0 +1,324 @@
+package com.facebook.presto.hudi.query.index;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.airlift.units.Duration;
+import com.facebook.presto.common.predicate.Domain;
+import com.facebook.presto.common.predicate.Range;
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.predicate.ValueSet;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.hudi.util.TupleDomainUtils;
+import com.facebook.presto.parquet.predicate.TupleDomainParquetPredicate;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.SchemaTableName;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.util.Lazy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.common.type.DateType.DATE;
+import static com.facebook.presto.common.type.DoubleType.DOUBLE;
+import static com.facebook.presto.common.type.IntegerType.INTEGER;
+import static com.facebook.presto.common.type.RealType.REAL;
+import static com.facebook.presto.common.type.SmallintType.SMALLINT;
+import static com.facebook.presto.common.type.TinyintType.TINYINT;
+import static com.facebook.presto.hudi.HudiSessionProperties.getColumnStatsWaitTimeout;
+import static com.facebook.presto.parquet.predicate.PredicateUtils.isStatisticsOverflow;
+import static java.lang.Float.floatToRawIntBits;
+
+public class HudiColumnStatsIndexSupport
+ extends HudiBaseIndexSupport
+{
+ private static final Logger log = Logger.get(HudiColumnStatsIndexSupport.class);
+ // file name -> column name -> domain with column stats
+ private final CompletableFuture>>> domainsWithStatsFuture;
+ protected final TupleDomain regularColumnPredicates;
+ private final List regularColumns;
+ private final Duration columnStatsWaitTimeout;
+ private final long futureStartTimeMs;
+
+ public HudiColumnStatsIndexSupport(ConnectorSession session, SchemaTableName schemaTableName, Lazy lazyMetaClient, Lazy lazyTableMetadata, TupleDomain regularColumnPredicates)
+ {
+ this(log, session, schemaTableName, lazyMetaClient, lazyTableMetadata, regularColumnPredicates);
+ }
+
+ public HudiColumnStatsIndexSupport(Logger log, ConnectorSession session, SchemaTableName schemaTableName, Lazy lazyMetaClient, Lazy lazyTableMetadata, TupleDomain regularColumnPredicates)
+ {
+ super(log, schemaTableName, lazyMetaClient);
+ this.columnStatsWaitTimeout = getColumnStatsWaitTimeout(session);
+ this.regularColumnPredicates = regularColumnPredicates;
+ this.regularColumns = regularColumnPredicates.getDomains()
+ .map(domains -> new ArrayList<>(domains.keySet()))
+ .orElse(new ArrayList<>());
+ if (regularColumnPredicates.isAll() || regularColumnPredicates.getDomains().isEmpty()) {
+ this.domainsWithStatsFuture = CompletableFuture.completedFuture(Optional.empty());
+ }
+ else {
+ // Get filter columns
+ List encodedTargetColumnNames = regularColumns
+ .stream()
+ .map(col -> new ColumnIndexID(col).asBase64EncodedString()).collect(Collectors.toList());
+
+ Map columnTypes = regularColumnPredicates.getDomains().get().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getType()));
+
+ domainsWithStatsFuture = CompletableFuture.supplyAsync(() -> {
+ HoodieTimer timer = HoodieTimer.start();
+ if (!lazyMetaClient.get().getTableConfig().getMetadataPartitions()
+ .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) {
+ return Optional.empty();
+ }
+
+ Map> domainsWithStats =
+ lazyTableMetadata.get().getRecordsByKeyPrefixes(encodedTargetColumnNames,
+ HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, true)
+ .collectAsList()
+ .stream()
+ .filter(f -> f.getData().getColumnStatMetadata().isPresent())
+ .map(f -> f.getData().getColumnStatMetadata().get())
+ .collect(Collectors.groupingBy(
+ HoodieMetadataColumnStats::getFileName,
+ Collectors.toMap(
+ HoodieMetadataColumnStats::getColumnName,
+ // Pre-compute the Domain object for each HoodieMetadataColumnStats
+ stats -> getDomainFromColumnStats(stats.getColumnName(), columnTypes.get(stats.getColumnName()), stats))));
+
+ log.debug("Column stats lookup took %s ms and identified %d relevant file IDs.", timer.endTimer(), domainsWithStats.size());
+
+ return Optional.of(domainsWithStats);
+ });
+ }
+ this.futureStartTimeMs = System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean shouldSkipFileSlice(FileSlice slice)
+ {
+ try {
+ if (domainsWithStatsFuture.isDone()) {
+ Optional