feat(lance): Add VariantType support to Lance base files#18599
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds VariantType support to Lance base files by introducing a single-pass schema rewrite and a stateful projection row on the write side, plus a child-swap ColumnVector wrapper on the read side. The end-to-end slot ordering (write: metadata=slot 0, value=slot 1 on disk; read: getChild(0)=value, getChild(1)=metadata to match Spark's positional VariantVal(value, metadata) constructor) is consistent and round-trips correctly. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of minor simplification opportunities in HoodieSparkLanceWriter; the new classes and adapter wiring are well-structured and clearly documented.
cc @yihua
| if (newFields == null) { | ||
| return Pair.of(sparkSchema, EMPTY_INT_ARRAY); | ||
| } | ||
| int[] ordinalArr = new int[ordinals.size()]; |
There was a problem hiding this comment.
🤖 nit: the three-line int[] conversion could be collapsed to ordinals.stream().mapToInt(Integer::intValue).toArray(), removing the intermediate array declaration and manual loop.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } | ||
| int numFields = inputSchema.fields().length; | ||
| GenericInternalRow[] structByOrdinal = new GenericInternalRow[numFields]; | ||
| List<BiConsumer<SpecializedGetters, Integer>> extractorByOrdinal = new ArrayList<>(numFields); |
There was a problem hiding this comment.
🤖 nit: the null-fill loop on the next few lines could be replaced by new ArrayList<>(Collections.nCopies(numFields, null)), making the pre-sized, null-initialized intent clearer in one line.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
c72d44a to
343371d
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds Spark 4 VariantType support to the Lance reader/writer, with the metadata/value slot ordering carefully aligned end-to-end (on-disk parquet-variant spec order on write, child swap on the ColumnVector boundary on read so Spark's final positional getVariant reconstructs correctly). A couple of edge cases worth double-checking in the inline comments — primarily around nested variants and accessor coverage on Spark 4.1. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of naming suggestions below — one for a local variable that could be mistaken for a Spark query plan, and one for a method name that diverges from its sibling's convention.
| private static Pair<StructType, int[]> enrichForLanceVariant(StructType sparkSchema) { | ||
| StructField[] fields = sparkSchema.fields(); | ||
| StructField[] newFields = null; | ||
| List<Integer> ordinals = null; |
There was a problem hiding this comment.
🤖 enrichForLanceVariant only matches top-level VariantType. If a user has a nested variant (e.g. Struct[..variant..], Array[Variant], Map[_, Variant]), isVariantType(field.dataType()) returns false and the schema falls through unmodified — LanceArrowUtils.toArrowSchema then throws an opaque UNSUPPORTED_DATATYPE. Could we add a defensive walk that fails fast with a clear "nested variants not supported in Lance" message so users aren't left to decode the lance-spark error?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| public final class Spark41VariantProjectedRow extends Spark4VariantProjectedRow { | ||
|
|
||
| public Spark41VariantProjectedRow(int numFields, | ||
| GenericInternalRow[] variantStructByOrdinal, |
There was a problem hiding this comment.
🤖 Spark 4.1 added getGeometry/getGeography over 4.0 — are these the only new InternalRow accessors in 4.1? If 4.1 added any other typed getters (e.g. time/timestamp variants), they'd silently fall through to the abstract InternalRow default and break for non-variant fields of those types. Could we double-check the InternalRow API delta between 4.0 and 4.1?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Yes. getGeometry/getGeography are not available in Spark4.0. The API delta is correct.
| @Override | ||
| public int numFields() { | ||
| return numFields; | ||
| } |
There was a problem hiding this comment.
🤖 getStruct always invokes the extractor at variant ordinals — it doesn't check input.isNullAt(ordinal) first. Could lance-spark ever call getStruct on a column without first checking nullity? If yes, this NPEs because the extractor runs input.getVariant(ord) (null) then .getValue(). If lance-spark's contract guarantees an isNullAt check, a comment here noting that contract would help future maintainers.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| super(file, DEFAULT_BATCH_SIZE, bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new)); | ||
| this.sparkSchema = enrichSparkSchemaForLance(sparkSchema); | ||
| this.inputSparkSchema = sparkSchema; | ||
| Pair<StructType, int[]> variantPlan = enrichForLanceVariant(sparkSchema); |
There was a problem hiding this comment.
🤖 nit: variantPlan reads like a LogicalPlan or QueryPlan in the Spark context — could you rename it to something like schemaAndOrdinals or variantEnrichment to make it clear this is just a (StructType, int[]) pair?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| * case, so we hand it a plain struct. Top-level only - nested variants are not | ||
| * yet supported. | ||
| */ | ||
| private static Pair<StructType, int[]> enrichForLanceVariant(StructType sparkSchema) { |
There was a problem hiding this comment.
🤖 nit: the sibling method is enrichSparkSchemaForLance — it might be worth renaming this to enrichSparkSchemaForLanceVariant so the two reads as a parallel pair and it's clear this one also operates on the schema.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
4d76994 to
93b69a3
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds Spark 4 VariantType support to the Lance reader/writer by introducing a single-pass schema rewrite, a per-row variant projection, and a child-swapping LanceVariantColumnVector that bridges the on-disk parquet-variant spec ordering with Spark's positional getVariant. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of minor naming and message clarity suggestions below.
cc @yihua
| super(file, DEFAULT_BATCH_SIZE, bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new)); | ||
| this.sparkSchema = enrichSparkSchemaForLance(sparkSchema); | ||
| this.inputSparkSchema = sparkSchema; | ||
| Pair<StructType, int[]> variantPlan = enrichForLanceVariant(sparkSchema); |
There was a problem hiding this comment.
🤖 nit: variantPlan reads like a query/execution plan, which isn't quite what this is — it's just the result of schema enrichment (a transformed schema + ordinal list). Could you rename it to something like variantEnriched or schemaAndOrdinals to make the content clearer at a glance?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
|
|
||
| @Override | ||
| public InternalRow copy() { | ||
| throw new UnsupportedOperationException("Spark4VariantProjectedRow is single-row scope"); |
There was a problem hiding this comment.
🤖 nit: the message "Spark4VariantProjectedRow is single-row scope" is a bit opaque in a stack trace — "single-row scope" doesn't tell the reader why copy is unsupported. Would something like "copy() not supported; rows are consumed synchronously by LanceArrowWriter" be more useful?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
93b69a3 to
e36ac9d
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds Spark 4 VariantType support to the Lance reader/writer, with a careful schema rewrite plus child-swap at the ColumnVector boundary to bridge between the parquet-variant spec on-disk order and Spark's positional getVariant reconstruction. The variant ordering invariants line up across writer extractors, Arrow schema, and reader child lookup. No additional critical issues flagged from this automated round-4 pass beyond what previous reviewers have already noted — a Hudi committer or PMC member can take it from here for a final review. A couple of minor simplification opportunities in HoodieSparkLanceWriter below.
cc @yihua
| .build(); | ||
| newFields[i] = new StructField(field.name(), variantStruct, field.nullable(), enriched); | ||
| } | ||
| if (newFields == null) { |
There was a problem hiding this comment.
🤖 nit: could you replace this manual unboxing loop with ordinals.stream().mapToInt(Integer::intValue).toArray()? It's a bit more idiomatic Java and saves a few lines.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| int[] variantOrdinals) { | ||
| if (variantOrdinals.length == 0) { | ||
| return new SparkArrowWriter(lanceArrowWriter, null); | ||
| } |
There was a problem hiding this comment.
🤖 nit: the add(null) loop could be replaced with new ArrayList<>(Collections.nCopies(numFields, null)) — same effect, more expressive intent.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
e36ac9d to
11eb3c7
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds Spark 4 VariantType read/write support to Lance via a schema-enriched struct (metadata, value) plus a positional-swap ColumnVector to bridge the parquet-variant-spec on-disk order against Spark's getVariant positional reconstruction. After tracing the write/read paths end-to-end (lambda → slot mapping → getChild swap → VariantVal constructor), the byte ordering is consistent across the round trip. No new critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of small consistency and simplification suggestions below.
cc @yihua
| return Pair.of(sparkSchema, EMPTY_INT_ARRAY); | ||
| } | ||
| int[] ordinalArr = new int[ordinals.size()]; | ||
| for (int i = 0; i < ordinalArr.length; i++) { |
There was a problem hiding this comment.
🤖 nit: could you simplify this to ordinals.stream().mapToInt(Integer::intValue).toArray()? The three-line loop is a bit ceremonial for what's really just a List→int[] conversion.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| def createVariantProjectedRow( | ||
| numFields: Int, | ||
| variantStructByOrdinal: Array[GenericInternalRow], | ||
| extractorByOrdinal: java.util.List[BiConsumer[SpecializedGetters, java.lang.Integer]] |
There was a problem hiding this comment.
🤖 nit: variantStructByOrdinal is an Array (random-access by ordinal) but extractorByOrdinal is a java.util.List — both are indexed 0..numFields-1 and non-null only at variant positions. Could you make them the same type? Using Array for both would let callers use consistent [] access, and would also eliminate the null-fill loop in SparkArrowWriter.of (an array of reference type is already null-initialized).
- AI-generated; verify before applying. React 👍/👎 to flag quality.
lance-spark's LanceArrowUtils rejects VariantType as UNSUPPORTED_DATATYPE, and Spark's final ColumnVector.getVariant reads child(0)=value, child(1)=metadata positionally - so a parquet-variant-spec struct (metadata first) deserializes swapped and fails with MALFORMED_VARIANT. Write side (HoodieSparkLanceWriter): rewrite top-level VariantType to Struct[metadata: binary, value: binary] tagged hudi_type=VARIANT, and project each InternalRow via SparkAdapter.createVariantValueWriter. Spark 3.x stays a no-op via sparkAdapter.isVariantType. Read side (LanceVariantColumnVector + LanceRecordIterator): wrap the on-disk struct so getChild(0) returns the value column and getChild(1) the metadata column. The .lance file keeps spec-compliant ordering; positional getVariant reconstructs a correct VariantVal. Spark 4.0/4.1 split: Spark4VariantProjectedRow is now abstract in hudi-spark4-common, with concrete Spark40VariantProjectedRow and Spark41VariantProjectedRow subclasses in the version-specific modules. This is required because Spark 4.1's SpecializedGetters adds abstract getGeometry/getGeography methods (returning GeometryVal/GeographyVal, types that don't exist in Spark 4.0), so the shared module can't implement them. createVariantProjectedRow moves from BaseSpark4Adapter to Spark4_0Adapter / Spark4_1Adapter, mirroring the existing createInternalRow split for HoodieInternalRow. Scope: top-level variant only; nested variants in Struct/Array/Map is a follow-up.
11eb3c7 to
da560cd
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds VariantType support to the Lance reader/writer for Spark 4 by rewriting top-level VariantType columns as Struct[metadata, value] on write and swapping child positions at the ColumnVector boundary on read so Spark's positional getVariant reconstructs a correct VariantVal. After tracing both the write and read paths (schema enrichment, ordinal preservation, extractor wiring, struct slot ordering, and the LanceVariantColumnVector getChild swap), the round-trip looks correct — the on-disk parquet-variant spec ordering is preserved while Spark's positional reads see what they expect. No new issues flagged from this automated pass beyond what prior review rounds already raised — a Hudi committer or PMC member can take it from here for a final review. A couple of minor simplification opportunities in HoodieSparkLanceWriter; everything else looks clean and well-commented.
cc @yihua
| .build(); | ||
| newFields[i] = new StructField(field.name(), variantStruct, field.nullable(), enriched); | ||
| } | ||
| if (newFields == null) { |
There was a problem hiding this comment.
🤖 nit: could you replace the manual unboxing loop with ordinals.stream().mapToInt(Integer::intValue).toArray()? It expresses the intent in one line.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| int[] variantOrdinals) { | ||
| if (variantOrdinals.length == 0) { | ||
| return new SparkArrowWriter(lanceArrowWriter, null); | ||
| } |
There was a problem hiding this comment.
🤖 nit: the two-step init (new ArrayList<>(numFields) + null-fill loop) could be collapsed to new ArrayList<>(Collections.nCopies(numFields, null)), which is a bit more declarative about what's happening.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| * case, so we hand it a plain struct. Top-level only - nested variants are not | ||
| * yet supported. | ||
| */ | ||
| private static Pair<StructType, int[]> enrichForLanceVariant(StructType sparkSchema) { |
There was a problem hiding this comment.
Could enrichForLanceVariant be incorporated into enrichSparkSchemaForLance?
| * Forwards rows to the lance-spark {@link LanceArrowWriter}. When the schema | ||
| * has no {@code VariantType} columns, rows are passed through directly. When it | ||
| * does, a single {@link VariantProjectedRow} instance is reused per row to | ||
| * delegate every accessor to the underlying input row except at variant | ||
| * ordinals, where it returns a pre-allocated {@code (metadata, value)} struct | ||
| * populated by {@link org.apache.spark.sql.hudi.SparkAdapter#createVariantValueWriter}. |
There was a problem hiding this comment.
Does this projection introduce overhead? Does Lance library or writer provide its own projection or adaptation for Variant Type?
Describe the issue this Pull Request addresses
Closes: #18596
VariantType columns cannot land in Lance base files.
lance-spark'sLanceArrowUtils.toArrowTyperejectsVariantTypewithUNSUPPORTED_DATATYPE. Compaction fails before writing.ColumnVector.getVariantisfinaland constructsVariantVal(child(0), child(1))positionally. Lance has no variant logical-type annotation, so a parquet-variant-spec struct on disk (metadata first, value second) deserializes swapped and throwsMALFORMED_VARIANT.Tests for the changes here are provided in: #18597
Summary and Changelog
Add Spark 4
VariantTypesupport to the Lance reader/writer.Write side (
HoodieSparkLanceWriter):VariantType->Struct[metadata: binary, value: binary], taggedhudi_type=VARIANTsoHoodieSparkSchemaConverterspromotes it back on read.VariantProjectedRow(extendsInternalRow): delegates every accessor to the underlying input row exceptgetStruct(o, n)at variant ordinals, where it returns a pre-allocated(metadata, value)GenericInternalRowpopulated bySparkAdapter.createVariantValueWriter. No per-rowObject[]alloc, no boxing of primitive cells.sparkAdapter.isVariantType, so Spark 3.x stays a no-op.Read side:
LanceVariantColumnVectorwraps the on-disk struct vector and reports type asVariantType.getChild(0)returns the value column,getChild(1)returns the metadata column (resolved by name from the underlyingStructVector). Spark's positionalgetVariantthen reconstructs a correctVariantValwhile the.lancefile keeps spec-compliant(metadata, value)ordering.LanceRecordIteratorwires the wrapper for any column whose requested Spark type isVariantType.Adapter wiring:
SparkAdaptergainsisVariantTypeandcreateVariantValueWriter. Spark 3.x adapters returnfalse/ a no-op writer; Spark 4 implementation lives inSpark4VariantProjectedRow.Spark 4.0 / 4.1 split:
Spark4VariantProjectedRowisabstractinhudi-spark4-common. ConcreteSpark40VariantProjectedRowandSpark41VariantProjectedRowsubclasses live in the version-specific modules. This is required because Spark 4.1'sSpecializedGettersadds abstractgetGeometry/getGeographymethods returningGeometryVal/GeographyVal- types that don't exist in Spark 4.0 - so the shared module can't implement them. The 4.1 subclass overrides both by delegating to the wrapped input row; the 4.0 subclass adds nothing.createVariantProjectedRowmoves fromBaseSpark4AdaptertoSpark4_0Adapter/Spark4_1Adapter, mirroring the existingcreateInternalRowsplit forHoodieInternalRow.Out of scope: nested
VariantTypeinsideStruct/Array/Mapon Lance, to be addressed in a follow-up, documented in issue: #18600.Impact
variantcolumn can now compact to.lancebase files and round-trip correctly. No new configs.metadatafirst). No format change.Object[]allocation and per-column primitive boxing. No-variant schemas skip the wrapper entirely.Risk Level
low
sparkAdapter.isVariantType; Spark 3.x and any non-variant write/read path is untouched.HoodieInternalRowpattern; both profiles compile cleanly.TestVariantDataTypeMOR log-only compaction case.Documentation Update
none
Contributor's checklist