Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Comment thread
linliu-code marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GetStructField, Literal}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -105,6 +105,10 @@ case class HoodieFileIndex(spark: SparkSession,

@transient protected var hasPushedDownPartitionPredicates: Boolean = false

/** True when any partition column is a nested field path (e.g. "nested_record.level"). */
private val hasNestedPartitionColumns: Boolean =
getPartitionColumns.exists(_.contains("."))

/**
* NOTE: [[indicesSupport]] is a transient state, since it's only relevant while logical plan
* is handled by the Spark's driver
Expand Down Expand Up @@ -167,19 +171,44 @@ case class HoodieFileIndex(spark: SparkSession,
/**
* Invoked by Spark to fetch list of latest base files per partition.
*
* @param partitionFilters partition column filters
* @param dataFilters data columns filters
* @return list of PartitionDirectory containing partition to base files mapping
* For regular partition columns, Spark passes correct `partitionFilters` directly.
*
* For nested partition columns (e.g. `nested_record.level`), Spark cannot match
* [[GetStructField]] expressions against the flat dot-path partition schema and passes
* `partitionFilters = []`. The nested predicates land in `dataFilters` instead.
* We re-extract them via [[extractNestedPartitionFilters]].
*
* Example: `SELECT * FROM t WHERE nested_record.level = 'INFO' AND int_field > 0`
* - Spark passes: `partitionFilters = []`, `dataFilters = [nested_record.level = 'INFO', int_field > 0]`
* - We extract: `effectivePartitionFilters = [nested_record.level = 'INFO']`
*
* This is stateless — safe under AQE re-planning, subqueries, and FileIndex reuse.
*
* Known limitation: for mixed flat+nested partitions (e.g. `["country", "nested_record.level"]`),
* if Spark passes `partitionFilters = [country = 'US']`, we skip extraction and the nested
* filter is not used for partition pruning. A future fix could merge extracted nested filters
* with the provided `partitionFilters`.
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val slices = filterFileSlices(dataFilters, partitionFilters).flatMap(
val effectivePartitionFilters = if (partitionFilters.isEmpty && hasNestedPartitionColumns) {
extractNestedPartitionFilters(dataFilters)
} else {
partitionFilters
}

val slices = filterFileSlices(dataFilters, effectivePartitionFilters).flatMap(
{ case (partitionOpt, fileSlices) =>
fileSlices.filter(!_.isEmpty).map(fs => ( InternalRow.fromSeq(partitionOpt.get.getValues), fs))
fileSlices.filter(!_.isEmpty).map(fs => (InternalRow.fromSeq(partitionOpt.get.getValues), fs))
}
)
prepareFileSlices(slices)
}

/** Delegates to companion object with this table's partition columns. */
private def extractNestedPartitionFilters(dataFilters: Seq[Expression]): Seq[Expression] = {
HoodieFileIndex.extractNestedPartitionFilters(dataFilters, getPartitionColumns.toSet)
}

protected def prepareFileSlices(slices: Seq[(InternalRow, FileSlice)]): Seq[PartitionDirectory] = {
hasPushedDownPartitionPredicates = true

Expand Down Expand Up @@ -212,25 +241,25 @@ case class HoodieFileIndex(spark: SparkSession,
}

/**
* The functions prunes the partition paths based on the input partition filters. For every partition path, the file
* slices are further filtered after querying metadata table based on the data filters.
* Prunes partitions by `partitionFilters`, then optionally applies data skipping via metadata
* table indices (column stats, record-level index, etc.) to filter file slices.
*
Comment thread
linliu-code marked this conversation as resolved.
* @param dataFilters data columns filters
* @param partitionFilters partition column filters
* @param partitionPrune for HoodiePruneFileSourcePartitions rule only prune partitions
* @return A sequence of pruned partitions and corresponding filtered file slices
* @param dataFilters data column filters (used for data skipping)
* @param partitionFilters partition column filters (used for partition pruning)
* @param isPartitionPruneOnly when true, skip data skipping. Used by [[HoodiePruneFileSourcePartitions]]
* during planning (data skipping runs later in [[listFiles]]).
*/
def filterFileSlices(dataFilters: Seq[Expression], partitionFilters: Seq[Expression], isPartitionPruned: Boolean = false)
def filterFileSlices(dataFilters: Seq[Expression], partitionFilters: Seq[Expression],
isPartitionPruneOnly: Boolean = false)
: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {

val (isPruned, prunedPartitionsAndFileSlices) =
prunePartitionsAndGetFileSlices(dataFilters, partitionFilters)
hasPushedDownPartitionPredicates = true

// If there are no data filters, return all the file slices.
// If isPartitionPurge is true, this fun is trigger by HoodiePruneFileSourcePartitions, don't look up candidate files
// If there are no file slices, return empty list.
if (prunedPartitionsAndFileSlices.isEmpty || dataFilters.isEmpty || isPartitionPruned ) {
// Skip data skipping when: no file slices, no data filters, or partition-prune-only mode
// (planning phase — data skipping runs later during execution).
if (prunedPartitionsAndFileSlices.isEmpty || dataFilters.isEmpty || isPartitionPruneOnly) {
prunedPartitionsAndFileSlices
} else {
// Look up candidate files names in the col-stats or record level index, if all of the following conditions are true
Expand Down Expand Up @@ -502,6 +531,65 @@ object HoodieFileIndex extends Logging {
val Strict: Val = Val("strict")
}

/**
* Extracts filters from `dataFilters` that reference nested partition columns by walking
* [[GetStructField]] chains to reconstruct the full dot-path and matching against partition
* column names. We cannot match on the struct root alone because sibling fields share it
* (e.g. `nested_record.level` and `nested_record.nested_int` both reference `nested_record`).
*
* Given partition column `nested_record.level` and:
* {{{
* dataFilters = [nested_record.level = 'INFO', nested_record.nested_int > 0, int_field = 5]
* }}}
* Returns: `[nested_record.level = 'INFO']`
*
* Known limitations vs regular partition columns:
* - `(nested_record.level = 'INFO' AND d = 2) OR (nested_record.level = 'ERROR')` is excluded
* entirely (references both partition and data columns). A weaker predicate like
* `nested_record.level IN ('INFO', 'ERROR')` could be extracted but is not implemented.
* Spark has the same OR limitation for regular partition columns.
*
* @param dataFilters filters to scan for nested partition predicates
* @param partitionColumnNames partition column dot-paths, e.g. `Set("nested_record.level")`
* @return only the filters whose every column reference is a partition column
*/
private[hudi] def extractNestedPartitionFilters(dataFilters: Seq[Expression],
partitionColumnNames: Set[String]): Seq[Expression] = {
val partitionColumnRoots = partitionColumnNames.map(_.split("\\.", 2)(0))
dataFilters.filter { expr =>
// Resolve all outermost GetStructField chains to their full dot-paths.
val structFieldPaths = collectOutermostStructFieldPaths(expr)
// The expression is a partition filter only when:
// 1. It contains at least one GetStructField that resolves to a partition column path, AND
// 2. ALL resolved paths are partition columns (no non-partition nested fields), AND
// 3. ALL attribute references are roots of partition columns
// (guards against mixed expressions like "nested_record.level = 'INFO' AND int_field > 0")
structFieldPaths.nonEmpty &&
structFieldPaths.forall(partitionColumnNames.contains) &&
expr.references.map(_.name).forall(partitionColumnRoots.contains)
}
}

/**
* Collects full dot-paths of outermost [[GetStructField]] chains in an expression.
* `EqualTo(a.b.c, 1)` → `Seq("a.b.c")` (not intermediate `"a.b"`).
*/
private[hudi] def collectOutermostStructFieldPaths(expr: Expression): Seq[String] = {
expr match {
case g: GetStructField => resolveGetStructFieldPath(g).toSeq
case _ => expr.children.flatMap(collectOutermostStructFieldPaths)
}
}

/** Resolves a [[GetStructField]] chain to its full dot-path: `attr("a").b.c` → `"a.b.c"`. */
private[hudi] def resolveGetStructFieldPath(expr: Expression): Option[String] = expr match {
case GetStructField(child: AttributeReference, _, Some(fieldName)) =>
Some(child.name + "." + fieldName)
case GetStructField(child: GetStructField, _, Some(fieldName)) =>
resolveGetStructFieldPath(child).map(_ + "." + fieldName)
case _ => None
}

def collectReferencedColumns(spark: SparkSession, queryFilters: Seq[Expression], schema: StructType): Seq[String] = {
val resolver = spark.sessionState.analyzer.resolver
val refs = queryFilters.flatMap(_.references)
Expand Down
Loading
Loading