feat(blob): Accept partial {type,data} or {type,reference} structs on write#18665
feat(blob): Accept partial {type,data} or {type,reference} structs on write#18665voonhous wants to merge 2 commits into
Conversation
dcb21ee to
9947842
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds a quality-of-life improvement that lets users write BLOB columns with partial {type,data} (INLINE) or {type,reference} (OUT_OF_LINE) structs, padding the missing sibling at the writer entry. The recursion through struct/array/map and the SQL-side canonicalizer for INSERT INTO are nicely scoped, and the canonical-input short-circuit keeps it cheap. A couple of observations worth double-checking in the inline comments around metadata preservation in nested rebuilds and a few smaller edge cases. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of naming nits: rebuiltType breaks the surrounding pad* convention, and dfPreBlobPad is hard to parse at a glance — suggestions below.
| private def padBlobStructValue(blobCol: Column, st: StructType, caseSensitive: Boolean): Column = { | ||
| val key: String => String = | ||
| if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT) | ||
| val present = st.fields.map(f => key(f.name)).toSet |
There was a problem hiding this comment.
🤖 When rebuilding a non-blob struct that contains a nested partial blob, the inner padField(...).as(f.name) doesn't propagate f.metadata, so the rebuilt struct's nested BLOB-tagged field loses its HoodieSchema.TYPE_METADATA_FIELD on the resulting DataFrame schema. The null branch via rebuiltType does preserve metadata, which makes the two branches asymmetric. Was this intentional (relying on the catalog/table schema downstream) or should it be .as(f.name, f.metadata) to keep validators / consumers that introspect the DF schema happy?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| Binding(attr, attr, None) | ||
| case other => | ||
| // Deterministic alias name from the canonicalized form so identical nested | ||
| // sub-trees (e.g. across SELECT and WHERE) get the same alias name. |
There was a problem hiding this comment.
🤖 Integer.toHexString(other.canonicalized.hashCode()) can collide between two genuinely different source expressions, in which case we'd inject two Aliases with the same name into the synthetic Project. Different ExprIds make plan addressing safe, but Spark name resolution within that Project would be ambiguous. Probably benign in practice — flagging in case a longer or UUID-suffixed alias name is preferable.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // RFC-100 BLOB QoL: accept partial INLINE-only `{type,data}` or OUT_OF_LINE-only | ||
| // `{type,reference}` user inputs by padding the missing sibling field with null at the | ||
| // ingest boundary. No-op for already-canonical 3-field structs. | ||
| val df = HoodieSparkSchemaConverters.padPartialBlobColumns(dfPreBlobPad) |
There was a problem hiding this comment.
🤖 Padding now runs on the prepped/streaming branches too (where the early-return previously kept sourceDf untouched). The canonical-input short-circuit should make this a no-op, but is there any prepped path where the input dataframe has already been encoded such that running this projection through it could change semantics (e.g. lazy plan invalidation, breaking pre-bound encoders)? If not, all good — just calling it out.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| /** | ||
| * Returns the post-padding DataType corresponding to `dataType`: every accepted partial | ||
| * blob struct is replaced by `expectedBlobStructType`; nested struct/array/map containers | ||
| * are rebuilt with their inner types similarly transformed. Used to provide the |
There was a problem hiding this comment.
🤖 nit: rebuiltType sits in the middle of a pad*-named family (padPartialBlobColumns, padDataType, padBlobStructValue, padField) but doesn't follow that convention. Could you rename it to paddedType (or canonicalType) so readers immediately understand its role without having to read the Javadoc?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| @@ -356,11 +357,15 @@ class HoodieSparkSqlWriterInternal { | |||
|
|
|||
| val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean | |||
| val latestTableSchemaOpt = getLatestTableSchema(tableMetaClient, schemaFromCatalog) | |||
There was a problem hiding this comment.
🤖 nit: dfPreBlobPad is a bit hard to parse at a glance. Could you call it unpaddedDf (or rawDf) to make the before/after relationship with val df = …padPartialBlobColumns(…) on line 367 immediately clear?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Mirror the parquet MOR log-only compaction tests for VECTOR, VARIANT, and BLOB onto the Lance base file format, and extend all variants with a 6th deltacommit so the cleaner has a chance to retire the post-compaction log-only slice and write a .clean instant. - VECTOR Lance: passes; verifies HoodieFileFormat.LANCE on the table config and that a .lance base file exists under the table path after compaction. - VARIANT Lance / BLOB INLINE Lance / BLOB OUT_OF_LINE Lance: gated by -Dlance.skip.tests; expected to fail at HoodieSparkLanceWriter -> LanceArrowUtils.toArrowType (RFC-100 Phase 2 gap). Each asserts the LANCE format config sticks to hoodie.properties immediately after CREATE TABLE so the table-level invariant is checked even when the writer fails downstream. - All 8 tests (4 parquet + 4 Lance) now drive a 6th merge-update after the compaction-triggering 5th commit. The 5th commit's auto-clean runs before inline compaction, so the prior log slice is not yet superseded; the 6th commit's postCommit clean retires it and writes the .clean instant. The cleaner-timeline assertion uses reloadActiveTimeline() to avoid a stale cached view.
… write
INLINE writes now accept the natural `{type, data}` shape and OUT_OF_LINE
writes accept `{type, reference}`; the missing sibling field is auto-padded
with null at the writer ingest boundary so the canonical 3-field BLOB layout
is preserved on disk. Padding recurses through StructType, ArrayType, and
MapType (via Spark's transform / transform_values) so nested partial blobs
are handled too. Already-canonical inputs are a no-op.
9947842 to
b4f6f5e
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds writer-side padding for partial BLOB struct inputs (INLINE {type,data} and OUT_OF_LINE {type,reference}) on both the DataFrame and SQL ingest paths, plus extends ReadBlobRule to handle nested source expressions. After tracing through the padding recursion (struct/array/map), the canonical-shape rebuild in padBlobStructValue, the InsertBlobCanonicalizer SQL path, and the HoodieInMemoryHashIndex.clear() test cleanup, the substantive concerns I'd raise are already captured by the prior round of review comments (metadata propagation through padField(...).as(f.name), hashCode-based alias naming collision risk, padding now applied to prepped/streaming branches, and a couple of naming nits). No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of small naming and defensive-coding suggestions below.
cc @yihua
| val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean | ||
| val latestTableSchemaOpt = getLatestTableSchema(tableMetaClient, schemaFromCatalog) | ||
| val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) { | ||
| val dfPreBlobPad = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) { |
There was a problem hiding this comment.
🤖 nit: dfPreBlobPad names the variable by its position in the pipeline rather than by what it holds. Something like dfWithoutMetaCols or sourceDfStripped would communicate the semantic purpose more directly to a reader who hasn't just read the adjacent lines.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // field when a user supplies an INLINE-only `{type, data}` blob struct. | ||
| private lazy val expectedBlobReferenceStructType: DataType = | ||
| expectedBlobStructType.fields | ||
| .find(_.name == HoodieSchema.Blob.EXTERNAL_REFERENCE).get.dataType |
There was a problem hiding this comment.
🤖 nit: .find(...).get in a lazy val will surface as a bare NoSuchElementException with no context if EXTERNAL_REFERENCE is ever absent from expectedBlobStructType. Using .getOrElse(throw new IllegalStateException("Missing EXTERNAL_REFERENCE in canonical blob struct")) would give an actionable error if the schema ever changes.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18665 +/- ##
=============================================
+ Coverage 46.80% 68.07% +21.26%
- Complexity 15316 29043 +13727
=============================================
Files 1963 2518 +555
Lines 107192 141251 +34059
Branches 13007 17561 +4554
=============================================
+ Hits 50173 96156 +45983
+ Misses 51757 37183 -14574
- Partials 5262 7912 +2650
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
BLOB writes require the full 3-field
{type, data, reference}struct on every row, even when only one sibling is used (referenceis unused for INLINE,datais unused for OUT_OF_LINE). The boilerplate is the first thing people hit when writing a blob.Note: Merge this after:
Summary and Changelog
{type, data}.{type, reference}.StructType,ArrayType,MapType(via Sparktransform/transform_values), so partial blobs nested inside complex types work too.Changes:
HoodieSparkSchemaConverters: new publicpadPartialBlobColumns(df)plus recursive helpers (padField,padDataType,padBlobStructValue,rebuiltType).HoodieSparkSqlWriter.writeInternal: pads the source DataFrame just before the schema-conversion / validation call.BlobTestHelpers: addedinlineBlobStructColMinimalandoutOfLineBlobStructColMinimal.TestReadBlobSQL: minimal-struct tests for INLINE and OUT_OF_LINE plus a nested struct/array/map case.TestBlobDataType: SQLnamed_structminimal-literal tests for both INLINE and OUT_OF_LINE.Impact
User-facing: BLOB writes accept fewer fields. On-disk layout: unchanged (still canonical 3-field).
Read path: untouched.
Performance: padding short-circuits on canonical inputs (single schema walk, no projection emitted).
Risk Level
low
Padding only fires when a partial blob field is detected by a quick schema scan. Canonical inputs hit an early return. Null-struct semantics are preserved with
when(col.isNull, lit(null)).Documentation Update
none
Contributor's checklist