Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
import org.apache.hudi.internal.schema.HoodieSchemaException

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{col, lit, struct, transform, transform_values, when}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
Expand Down Expand Up @@ -82,6 +84,178 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport {
def validateCustomTypeStructures(structType: StructType): Unit =
validateCustomTypeStructuresRecursive(structType)

/**
* Pads partial BLOB columns - anywhere they appear in the schema tree - to the canonical
* 3-field layout `{type, data, reference}` so the writer's row encoder always sees the
* full shape. Recurses through nested `StructType`, `ArrayType`, and `MapType` to mirror
* the validator's coverage.
*
* RFC-100 BLOB columns are physically a 3-field struct, but for INLINE writes only
* `{type, data}` is meaningful and for OUT_OF_LINE writes only `{type, reference}` is
* meaningful. This helper accepts either partial form on input and rewrites each row to
* the canonical 3-field shape with `lit(null)` filling in the missing field. Null blob
* structs (and null array elements / map values containing blobs) round-trip as null.
* Already-canonical blob columns pass through unchanged (idempotent).
*
* @param df the DataFrame whose BLOB columns may be partial at any nesting depth
* @return the input DataFrame if no partial blob columns were found, or a projected
* DataFrame with each partial blob column rewritten to canonical shape
*/
def padPartialBlobColumns(df: DataFrame): DataFrame = {
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
if (!df.schema.fields.exists(f => fieldNeedsPad(f, caseSensitive))) {
df
} else {
val projected: Seq[Column] = df.schema.fields.map { f =>
if (fieldNeedsPad(f, caseSensitive)) {
padField(f, col(s"`${f.name}`"), caseSensitive).as(f.name, f.metadata)
} else {
col(s"`${f.name}`")
}
}
df.select(projected: _*)
}
}

/**
* Returns true if the field itself is a partial blob field that needs padding,
* or if any partial blob field exists somewhere inside its data type.
*/
private def fieldNeedsPad(field: StructField, caseSensitive: Boolean): Boolean =
isPartialBlobField(field, caseSensitive) || typeNeedsPad(field.dataType, caseSensitive)

/**
* Returns true if the data type contains, anywhere within it, a BLOB-tagged StructField
* whose struct shape is a partial 2-field accepted layout.
*/
private def typeNeedsPad(dataType: DataType, caseSensitive: Boolean): Boolean = dataType match {
case s: StructType => s.fields.exists(f => fieldNeedsPad(f, caseSensitive))
case ArrayType(elementType, _) => typeNeedsPad(elementType, caseSensitive)
case MapType(_, valueType, _) => typeNeedsPad(valueType, caseSensitive)
case _ => false
}

/**
* Returns true if `field` is tagged `hudi_type=BLOB` and its struct shape is one of the
* accepted partial layouts: `{type, data}` or `{type, reference}`. Canonical 3-field
* structs return false (no padding needed). Anything else also returns false - the strict
* validator will reject those downstream.
*/
private def isPartialBlobField(field: StructField, caseSensitive: Boolean): Boolean = {
if (!field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) return false
val descriptorType = HoodieSchema
.parseTypeDescriptor(field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
.getType
if (descriptorType != HoodieSchemaType.BLOB) return false
field.dataType match {
case st: StructType if !isCanonicalBlobStruct(st) => isAcceptedPartialBlobStruct(st, caseSensitive)
case _ => false
}
}

private def isAcceptedPartialBlobStruct(st: StructType, caseSensitive: Boolean): Boolean = {
if (st.length != 2) return false
val key: String => String =
if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
val names = st.fields.map(f => key(f.name)).toSet
val typeKey = key(HoodieSchema.Blob.TYPE)
val dataKey = key(HoodieSchema.Blob.INLINE_DATA_FIELD)
val refKey = key(HoodieSchema.Blob.EXTERNAL_REFERENCE)
names == Set(typeKey, dataKey) || names == Set(typeKey, refKey)
}

/**
* Builds a Column expression that rewrites the value at `sourceCol` (which has the same
* shape as `field.dataType`) to its post-padding canonical shape. Used by
* [[padPartialBlobColumns]] and recursively by itself for nested struct fields.
*
* The caller is responsible for `.as(field.name, field.metadata)` on the returned column;
* this method produces an unaliased value expression so it can also be used inside
* `transform`/`transform_values` lambdas.
*/
private def padField(field: StructField, sourceCol: Column, caseSensitive: Boolean): Column = {
if (isPartialBlobField(field, caseSensitive)) {
padBlobStructValue(sourceCol, field.dataType.asInstanceOf[StructType], caseSensitive)
} else {
padDataType(field.dataType, sourceCol, caseSensitive)
}
}

/**
* Builds a Column expression that rewrites a value at `sourceCol` (whose shape is
* `dataType`) so any partial blob structs nested anywhere inside are padded to canonical.
* If no padding is needed inside `dataType`, returns `sourceCol` directly.
*/
private def padDataType(dataType: DataType, sourceCol: Column, caseSensitive: Boolean): Column = {
if (!typeNeedsPad(dataType, caseSensitive)) return sourceCol
dataType match {
case s: StructType =>
val rebuiltFields: Seq[Column] = s.fields.map { f =>
val childExpr = sourceCol.getField(f.name)
padField(f, childExpr, caseSensitive).as(f.name)
}
// Preserve null-struct semantics: a null source struct must round-trip as null,
// not as a non-null struct with all-null fields produced by `struct(...)`.
when(sourceCol.isNull, lit(null).cast(rebuiltType(s, caseSensitive)))
.otherwise(struct(rebuiltFields: _*))

case ArrayType(elementType, _) =>
// transform() preserves the array's null-ness; the lambda handles null elements.
transform(sourceCol, (x: Column) => padDataType(elementType, x, caseSensitive))

case MapType(_, valueType, _) =>
// transform_values() preserves the map's null-ness; lambda handles null values.
transform_values(sourceCol, (_: Column, v: Column) => padDataType(valueType, v, caseSensitive))

case _ => sourceCol
}
}

/**
* Rewrites a (possibly null) blob-struct value at `blobCol` to the canonical 3-field
* shape, padding the missing sibling field with `lit(null)`. Preserves null-struct
* semantics: a null source struct round-trips as null.
*/
private def padBlobStructValue(blobCol: Column, st: StructType, caseSensitive: Boolean): Column = {
val key: String => String =
if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
val present = st.fields.map(f => key(f.name)).toSet
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 When rebuilding a non-blob struct that contains a nested partial blob, the inner padField(...).as(f.name) doesn't propagate f.metadata, so the rebuilt struct's nested BLOB-tagged field loses its HoodieSchema.TYPE_METADATA_FIELD on the resulting DataFrame schema. The null branch via rebuiltType does preserve metadata, which makes the two branches asymmetric. Was this intentional (relying on the catalog/table schema downstream) or should it be .as(f.name, f.metadata) to keep validators / consumers that introspect the DF schema happy?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

val typeCol = blobCol.getField(HoodieSchema.Blob.TYPE).as(HoodieSchema.Blob.TYPE)
val dataCol = if (present.contains(key(HoodieSchema.Blob.INLINE_DATA_FIELD))) {
blobCol.getField(HoodieSchema.Blob.INLINE_DATA_FIELD).as(HoodieSchema.Blob.INLINE_DATA_FIELD)
} else {
lit(null).cast(BinaryType).as(HoodieSchema.Blob.INLINE_DATA_FIELD)
}
val refCol = if (present.contains(key(HoodieSchema.Blob.EXTERNAL_REFERENCE))) {
blobCol.getField(HoodieSchema.Blob.EXTERNAL_REFERENCE).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
} else {
lit(null).cast(expectedBlobReferenceStructType).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
}
when(blobCol.isNull, lit(null).cast(expectedBlobStructType))
.otherwise(struct(typeCol, dataCol, refCol))
}

/**
* Returns the post-padding DataType corresponding to `dataType`: every accepted partial
* blob struct is replaced by `expectedBlobStructType`; nested struct/array/map containers
* are rebuilt with their inner types similarly transformed. Used to provide the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: rebuiltType sits in the middle of a pad*-named family (padPartialBlobColumns, padDataType, padBlobStructValue, padField) but doesn't follow that convention. Could you rename it to paddedType (or canonicalType) so readers immediately understand its role without having to read the Javadoc?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* `lit(null).cast(...)` target type when guarding null-struct semantics.
*/
private def rebuiltType(dataType: DataType, caseSensitive: Boolean): DataType = dataType match {
case s: StructType =>
StructType(s.fields.map { f =>
val newType =
if (isPartialBlobField(f, caseSensitive)) expectedBlobStructType
else rebuiltType(f.dataType, caseSensitive)
f.copy(dataType = newType)
})
case ArrayType(elementType, containsNull) =>
ArrayType(rebuiltType(elementType, caseSensitive), containsNull)
case MapType(keyType, valueType, valueContainsNull) =>
MapType(keyType, rebuiltType(valueType, caseSensitive), valueContainsNull)
case other => other
}

private def validateCustomTypeStructuresRecursive(dataType: DataType): Unit = dataType match {
case s: StructType =>
s.fields.foreach { f =>
Expand Down Expand Up @@ -409,6 +583,13 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport {

private lazy val expectedBlobStructType: StructType = toSqlType(HoodieSchema.createBlob())._1.asInstanceOf[StructType]

// Spark type of the canonical reference sub-struct ({external_path, offset, length, managed}).
// Used by padPartialBlobColumns to construct lit(null).cast(...) for the missing reference
// field when a user supplies an INLINE-only `{type, data}` blob struct.
private lazy val expectedBlobReferenceStructType: DataType =
expectedBlobStructType.fields
.find(_.name == HoodieSchema.Blob.EXTERNAL_REFERENCE).get.dataType
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: .find(...).get in a lazy val will surface as a bare NoSuchElementException with no context if EXTERNAL_REFERENCE is ever absent from expectedBlobStructType. Using .getOrElse(throw new IllegalStateException("Missing EXTERNAL_REFERENCE in canonical blob struct")) would give an actionable error if the schema ever changes.

- AI-generated; verify before applying. React 👍/👎 to flag quality.


/**
* Validates that a StructType matches the expected blob schema structure defined in {@link HoodieSchema.Blob}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
Expand Down Expand Up @@ -237,6 +238,12 @@ protected void initQueryIndexConf() {
* Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
*/
protected void cleanupSparkContexts() {
// HoodieInMemoryHashIndex holds a JVM-static record-location map that survives
// sparkSession.stop(), leaking record keys and locations across sequential tests
// in the same JVM. A stale entry causes tagLocation to demote a not-matched
// INSERT into a no-op UPDATE on a non-existent file group.
HoodieInMemoryHashIndex.clear();

if (sparkSession != null) {
sparkSession.stop();
sparkSession = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -356,11 +357,15 @@ class HoodieSparkSqlWriterInternal {

val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val latestTableSchemaOpt = getLatestTableSchema(tableMetaClient, schemaFromCatalog)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: dfPreBlobPad is a bit hard to parse at a glance. Could you call it unpaddedDf (or rawDf) to make the before/after relationship with val df = …padPartialBlobColumns(…) on line 367 immediately clear?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) {
val dfPreBlobPad = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: dfPreBlobPad names the variable by its position in the pipeline rather than by what it holds. Something like dfWithoutMetaCols or sourceDfStripped would communicate the semantic purpose more directly to a reader who hasn't just read the adjacent lines.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

sourceDf
} else {
sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala.toSeq: _*)
}
// RFC-100 BLOB QoL: accept partial INLINE-only `{type,data}` or OUT_OF_LINE-only
// `{type,reference}` user inputs by padding the missing sibling field with null at the
// ingest boundary. No-op for already-canonical 3-field structs.
val df = HoodieSparkSchemaConverters.padPartialBlobColumns(dfPreBlobPad)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Padding now runs on the prepped/streaming branches too (where the early-return previously kept sourceDf untouched). The canonical-input short-circuit should make this a no-op, but is there any prepped path where the input dataframe has already been encoded such that running this projection through it could change semantics (e.g. lazy plan invalidation, breaking pre-bound encoders)? If not, all good — just calling it out.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

// NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
Expand Down
Loading
Loading