[OSS PR #18497] [WIP] feat(lance): round-trip Hudi VECTOR columns as native Lance fixed-size lists#49
Conversation
Translate the Hudi VECTOR logical-type metadata (`hudi_type = "VECTOR(dim[,elem])"`)
into the lance-spark metadata key `arrow.fixed-size-list.size` before calling
`LanceArrowUtils.toArrowSchema`, so the Lance writer emits a native Arrow
FixedSizeList<Float32|Float64, dim> (Lance's vector column encoding) instead
of a plain variable-length list. No change needed at `LanceFileWriter.open(...)`;
the encoding is driven by the Arrow schema itself.
- New private helper `enrichSparkSchemaForLanceVectors` in `HoodieSparkLanceWriter`
reuses `VectorConversionUtils.detectVectorColumnsFromMetadata` to find VECTOR
fields and attaches the Lance metadata key; non-vector fields pass through
unchanged.
- Fails fast with `HoodieNotSupportedException` for non-ArrayType or non-
Float/Double element types (matches lance-spark's `shouldBeFixedSizeList`).
- Tests in `TestLanceDataSource` (COW + MOR):
- `testFloatVectorRoundTrip`
- `testDoubleVectorRoundTrip`
- `testMultipleVectorColumns`
Each opens the written `.lance` file via `LanceFileReader` and asserts the
field is `ArrowType.FixedSizeList` with the expected `listSize` — the direct
regression guard that fails pre-fix and passes post-fix.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Companion to the Lance writer's native FixedSizeList encoding: on read, rehydrate the Hudi `hudi_type = VECTOR(...)` Spark metadata that `LanceArrowUtils.fromArrowSchema` drops, so the read schema matches the Parquet path. Gate the Parquet-only ArrayType→BinaryType vector rewrite in HoodieFileGroupReaderBasedFileFormat on format == PARQUET; Lance returns vectors natively as ArrayType so the rewrite would trigger a spurious cast and break the read. - VectorConversionUtils.restoreVectorMetadataFromArrowSchema walks the Arrow schema and re-attaches VECTOR(dim[,DOUBLE]) for FixedSizeList<Float32|Float64, dim> fields. - HoodieSparkLanceReader.getSchema and SparkLanceReaderBase.read now call it so downstream VECTOR-aware code sees the same schema as on Parquet. - TestLanceDataSource: assert hudi_type metadata is restored on read for float, double, and multi-vector round-trips. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Mirrors the Parquet writer: emit the comma-separated `colName:VECTOR(dim[,elemType])` descriptor list under the existing `hoodie.vector.columns` key in the Lance file-footer key-value metadata. Reader still derives VECTOR identity from the Arrow FixedSizeList type today; this footer entry is insurance for future descriptor fields the Arrow type cannot express (quantization tags, distance metrics, etc.) and keeps Lance files symmetric with Parquet files. - HoodieBaseLanceWriter: new protected `additionalSchemaMetadata()` hook invoked during close(), so subclasses can contribute footer KV entries alongside bloom-filter metadata. - HoodieSparkLanceWriter: override `additionalSchemaMetadata()` to emit `hoodie.vector.columns` when the Spark schema has any VECTOR column. - VectorConversionUtils: add `buildVectorColumnsMetadataValue(StructType)` matching the Parquet-path helper's output format. - TestLanceDataSource: assert footer carries the expected descriptor list for float, double, and multi-vector round-trips. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
SparkFileFormatInternalRowReaderContext.getFileRecordIterator had a second, unconditional rewrite of VECTOR columns from ArrayType to BinaryType (the earlier withVectorRewrite gate in HoodieFileGroupReaderBasedFileFormat only covered the non-FileGroupReader branch). On the MOR / FileGroupReader path this caused Lance reads to fail with scala.MatchError: ArrayType(FloatType,true) in Cast.castToBinaryCode, because Lance returns vectors natively as ArrayType while the caller-supplied schema had been rewritten to BinaryType — the generated UnsafeProjection then injected an unsupported Cast(ArrayType -> BinaryType). Gate the detection + rewrite on the file format: skip it for .lance base files. Hudi log files are always parquet-encoded so they still take the Parquet path. Fixes 14 TestLanceDataSource vector errors (COW + MOR) observed in spark3.5 / spark3.4 CI, including the spark3.4 part2 6h timeout that was the same failure retrying. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
On Scala 2.13, Row.getAs[Seq[Float]] fails at runtime with ClassCastException: scala.collection.mutable.ArraySeq$ofRef cannot be cast to scala.collection.immutable.Seq, because Seq in Scala 2.13 defaults to immutable.Seq while Spark holds array columns as mutable.ArraySeq internally. Row.getSeq[T] is declared as scala.collection.Seq[T] (general), so it works on both 2.12 (where Seq = scala.collection.Seq) and 2.13 (where Seq = scala.collection.immutable.Seq). Same runtime object, no cast. Fixes the 14 TestLanceDataSource errors on java17 CI (scala-2.13, spark3.5 / spark4.0). The earlier VECTOR->BinaryType rewrite fix resolved the scala.MatchError in the read path; this change resolves the subsequent 2.13-only test-side ClassCastException. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… size check - SparkFileFormatInternalRowReaderContext: use tableConfig.getBaseFileFormat instead of filename extension sniff to detect Lance base files - VectorConversionUtils.restoreVectorMetadataFromArrowSchema: remove confusing arrowFields.size != sparkFields.length defensive guard Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
yihua has reached the 50-review limit for trial accounts. To continue receiving code reviews, upgrade your plan.
📝 WalkthroughWalkthroughThis PR enhances vector metadata handling across Hudi to support Lance file format persistence and recovery. It introduces utilities to serialize/restore vector descriptors, adds vector schema enrichment during Lance writes, restores vector type information during reads, renames a format-agnostic metadata key constant, and implements conditional vector schema rewriting based on file format. Test coverage for Lance vectors is substantially expanded. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant SparkSchema as Spark Schema<br/>(with VECTOR metadata)
participant Writer as HoodieSparkLanceWriter
participant Enrich as VectorConversionUtils
participant Arrow as Arrow Schema
participant Lance as Lance File<br/>(with footer metadata)
User->>Writer: Write DataFrame with VECTOR columns
Writer->>Enrich: enrichVectorMetadataInSchema()
Enrich->>Enrich: Detect VECTOR fields<br/>Add dimension to metadata
Enrich-->>Writer: Enriched StructType
Writer->>Arrow: Convert to Arrow schema<br/>(with sizing info)
Arrow->>Lance: Write with metadata
Writer->>Lance: addSchemaMetadata()<br/>(vector columns footer)
Lance-->>User: Persisted file
sequenceDiagram
actor User
participant Lance as Lance File<br/>(with footer metadata)
participant Reader as LanceReader
participant Arrow as Arrow Schema<br/>(FixedSizeList)
participant Restore as VectorConversionUtils
participant SparkSchema as Spark Schema<br/>(restored VECTOR metadata)
User->>Reader: Read Lance file
Reader->>Lance: Load Arrow schema
Lance-->>Arrow: Return schema<br/>(FixedSizeList fields)
Reader->>Arrow: Convert to Spark StructType
Arrow-->>Reader: Basic StructType<br/>(no VECTOR info)
Reader->>Restore: restoreVectorMetadataFromArrowSchema()
Restore->>Restore: Detect FixedSizeList of<br/>Float32/Float64
Restore->>SparkSchema: Attach VECTOR metadata
SparkSchema-->>User: Return enriched schema
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala (1)
1109-1121: Consider adding a memory limit to RootAllocator.
new RootAllocator()creates an allocator without a memory limit. While acceptable for tests, adding a limit (e.g.,new RootAllocator(64 * 1024 * 1024)) would prevent unbounded memory allocation if test data unexpectedly grows.♻️ Suggested improvement
- val allocator = new RootAllocator() + val allocator = new RootAllocator(64 * 1024 * 1024) // 64MB limit for test🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala` around lines 1109 - 1121, The RootAllocator is created without a memory limit which can lead to unbounded allocation in tests; change the instantiation of RootAllocator in TestLanceDataSource (the allocator variable created with new RootAllocator()) to include a reasonable limit (for example new RootAllocator(64 * 1024 * 1024)), ensuring the allocator is still closed in the existing finally block; alternatively make the size a named constant or test-config value if you prefer configurability.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java`:
- Around line 154-183: In enrichSparkSchemaForLanceVectors, validate that any
StructField with a detected HoodieSchema.Vector (from
VectorConversionUtils.detectVectorColumnsFromMetadata) has a matching Spark type
shape before enriching: assert the field.dataType() is an ArrayType whose
elementType is FloatType when vec.getVectorElementType()==FLOAT or DoubleType
when DOUBLE, and optionally verify the fixed length equals vec.getDimension();
if any check fails throw a HoodieNotSupportedException describing the field
name, expected Spark element type/shape and actual type so we fail fast (do this
check for each vec in HoodieSparkLanceWriter.enrichSparkSchemaForLanceVectors
before building the new Metadata and before later calls such as
LanceArrowUtils.toArrowSchema).
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java`:
- Around line 96-102: The current serialization in VectorConversionUtils builds
"named" by iterating the unordered Map returned from
detectVectorColumnsFromMetadata, causing unstable hoodle.vector.columns order;
instead, iterate over the schema's fields[] in ordinal order (for i =
0..fields.length-1), check detected.get(i) for a non-null HoodieSchema.Vector
and if present put(fields[i].name(), value) into the LinkedHashMap, then call
HoodieSchema.serializeVectorColumnsMetadata(named) so the metadata preserves
schema order.
In `@hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java`:
- Line 219: Add a deprecated alias constant in HoodieSchema for backward
compatibility: retain PARQUET_VECTOR_COLUMNS_METADATA_KEY as a public static
final String that delegates to VECTOR_COLUMNS_METADATA_KEY and mark it
`@Deprecated` with a javadoc pointing to VECTOR_COLUMNS_METADATA_KEY; update any
usages in the class to reference VECTOR_COLUMNS_METADATA_KEY only and keep the
alias for at least one release so downstream code compiling against
PARQUET_VECTOR_COLUMNS_METADATA_KEY continues to work.
---
Nitpick comments:
In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala`:
- Around line 1109-1121: The RootAllocator is created without a memory limit
which can lead to unbounded allocation in tests; change the instantiation of
RootAllocator in TestLanceDataSource (the allocator variable created with new
RootAllocator()) to include a reasonable limit (for example new RootAllocator(64
* 1024 * 1024)), ensuring the allocator is still closed in the existing finally
block; alternatively make the size a named constant or test-config value if you
prefer configurability.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 39324687-0e79-4958-b547-f169bf876e59
📒 Files selected for processing (12)
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.javahudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.javahudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.javahudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.javahudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scalahudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.javahudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.javahudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.javahudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scalahudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scalahudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scalahudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
| private static StructType enrichSparkSchemaForLanceVectors(StructType sparkSchema) { | ||
| Map<Integer, HoodieSchema.Vector> vectorColumns = | ||
| VectorConversionUtils.detectVectorColumnsFromMetadata(sparkSchema); | ||
| if (vectorColumns.isEmpty()) { | ||
| return sparkSchema; | ||
| } | ||
| StructField[] fields = sparkSchema.fields(); | ||
| StructField[] newFields = new StructField[fields.length]; | ||
| for (int i = 0; i < fields.length; i++) { | ||
| StructField field = fields[i]; | ||
| HoodieSchema.Vector vec = vectorColumns.get(i); | ||
| if (vec == null) { | ||
| newFields[i] = field; | ||
| continue; | ||
| } | ||
| HoodieSchema.Vector.VectorElementType elemType = vec.getVectorElementType(); | ||
| if (elemType != HoodieSchema.Vector.VectorElementType.FLOAT | ||
| && elemType != HoodieSchema.Vector.VectorElementType.DOUBLE) { | ||
| throw new HoodieNotSupportedException( | ||
| "Lance base-file format currently supports FLOAT/DOUBLE VECTOR columns only; " | ||
| + "got element type " + elemType + " for field '" + field.name() + "'"); | ||
| } | ||
| Metadata enriched = new MetadataBuilder() | ||
| .withMetadata(field.metadata()) | ||
| .putLong(LanceArrowUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY(), vec.getDimension()) | ||
| .build(); | ||
| newFields[i] = new StructField(field.name(), field.dataType(), field.nullable(), enriched); | ||
| } | ||
| return new StructType(newFields); | ||
| } |
There was a problem hiding this comment.
Validate the Spark field shape against the VECTOR descriptor before enriching it.
Right now any field carrying hudi_type=VECTOR(...) is accepted here as long as the descriptor says FLOAT/DOUBLE. If the actual Spark type is not ArrayType of the matching primitive element type, Lance will serialize one physical type while Hudi metadata advertises another. Please fail fast on mismatches before calling LanceArrowUtils.toArrowSchema(...).
Suggested guard
+ import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType}
+
for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];
HoodieSchema.Vector vec = vectorColumns.get(i);
if (vec == null) {
newFields[i] = field;
@@
if (elemType != HoodieSchema.Vector.VectorElementType.FLOAT
&& elemType != HoodieSchema.Vector.VectorElementType.DOUBLE) {
throw new HoodieNotSupportedException(
"Lance base-file format currently supports FLOAT/DOUBLE VECTOR columns only; "
+ "got element type " + elemType + " for field '" + field.name() + "'");
}
+ if (!(field.dataType() instanceof org.apache.spark.sql.types.ArrayType)) {
+ throw new HoodieNotSupportedException(
+ "VECTOR metadata requires ArrayType for field '" + field.name() + "', got " + field.dataType());
+ }
+ org.apache.spark.sql.types.ArrayType arrayType =
+ (org.apache.spark.sql.types.ArrayType) field.dataType();
+ boolean matches = (elemType == HoodieSchema.Vector.VectorElementType.FLOAT && arrayType.elementType().sameType(org.apache.spark.sql.types.DataTypes.FloatType))
+ || (elemType == HoodieSchema.Vector.VectorElementType.DOUBLE && arrayType.elementType().sameType(org.apache.spark.sql.types.DataTypes.DoubleType));
+ if (!matches) {
+ throw new HoodieNotSupportedException(
+ "VECTOR metadata for field '" + field.name() + "' does not match Spark type " + field.dataType());
+ }
Metadata enriched = new MetadataBuilder()
.withMetadata(field.metadata())
.putLong(LanceArrowUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY(), vec.getDimension())
.build();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java`
around lines 154 - 183, In enrichSparkSchemaForLanceVectors, validate that any
StructField with a detected HoodieSchema.Vector (from
VectorConversionUtils.detectVectorColumnsFromMetadata) has a matching Spark type
shape before enriching: assert the field.dataType() is an ArrayType whose
elementType is FloatType when vec.getVectorElementType()==FLOAT or DoubleType
when DOUBLE, and optionally verify the fixed length equals vec.getDimension();
if any check fails throw a HoodieNotSupportedException describing the field
name, expected Spark element type/shape and actual type so we fail fast (do this
check for each vec in HoodieSparkLanceWriter.enrichSparkSchemaForLanceVectors
before building the new Metadata and before later calls such as
LanceArrowUtils.toArrowSchema).
| StructField[] fields = schema.fields(); | ||
| Map<Integer, HoodieSchema.Vector> detected = detectVectorColumnsFromMetadata(schema); | ||
| java.util.LinkedHashMap<String, HoodieSchema.Vector> named = new java.util.LinkedHashMap<>(); | ||
| for (Map.Entry<Integer, HoodieSchema.Vector> entry : detected.entrySet()) { | ||
| named.put(fields[entry.getKey()].name(), entry.getValue()); | ||
| } | ||
| return HoodieSchema.serializeVectorColumnsMetadata(named); |
There was a problem hiding this comment.
Preserve schema order when serializing vector footer metadata.
detected is a HashMap, so iterating entrySet() makes hoodie.vector.columns order unstable across runs. Since this value is meant to be canonical, build named by walking fields in ordinal order instead.
Suggested fix
StructField[] fields = schema.fields();
Map<Integer, HoodieSchema.Vector> detected = detectVectorColumnsFromMetadata(schema);
java.util.LinkedHashMap<String, HoodieSchema.Vector> named = new java.util.LinkedHashMap<>();
- for (Map.Entry<Integer, HoodieSchema.Vector> entry : detected.entrySet()) {
- named.put(fields[entry.getKey()].name(), entry.getValue());
+ for (int i = 0; i < fields.length; i++) {
+ HoodieSchema.Vector vector = detected.get(i);
+ if (vector != null) {
+ named.put(fields[i].name(), vector);
+ }
}
return HoodieSchema.serializeVectorColumnsMetadata(named);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java`
around lines 96 - 102, The current serialization in VectorConversionUtils builds
"named" by iterating the unordered Map returned from
detectVectorColumnsFromMetadata, causing unstable hoodle.vector.columns order;
instead, iterate over the schema's fields[] in ordinal order (for i =
0..fields.length-1), check detected.get(i) for a non-null HoodieSchema.Vector
and if present put(fields[i].name(), value) into the LinkedHashMap, then call
HoodieSchema.serializeVectorColumnsMetadata(named) so the metadata preserves
schema order.
| * so that any reader can identify vector columns without needing the Hudi schema store. | ||
| */ | ||
| public static final String PARQUET_VECTOR_COLUMNS_METADATA_KEY = "hoodie.vector.columns"; | ||
| public static final String VECTOR_COLUMNS_METADATA_KEY = "hoodie.vector.columns"; |
There was a problem hiding this comment.
Keep a deprecated alias for the old metadata-key constant.
Replacing a public constant here will break downstream code that still compiles against HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY. Please retain the old field as a deprecated alias to VECTOR_COLUMNS_METADATA_KEY for at least one release.
Suggested compatibility shim
public static final String VECTOR_COLUMNS_METADATA_KEY = "hoodie.vector.columns";
+
+ /**
+ * `@deprecated` use {`@link` `#VECTOR_COLUMNS_METADATA_KEY`}
+ */
+ `@Deprecated`
+ public static final String PARQUET_VECTOR_COLUMNS_METADATA_KEY = VECTOR_COLUMNS_METADATA_KEY;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public static final String VECTOR_COLUMNS_METADATA_KEY = "hoodie.vector.columns"; | |
| public static final String VECTOR_COLUMNS_METADATA_KEY = "hoodie.vector.columns"; | |
| /** | |
| * `@deprecated` use {`@link` `#VECTOR_COLUMNS_METADATA_KEY`} | |
| */ | |
| `@Deprecated` | |
| public static final String PARQUET_VECTOR_COLUMNS_METADATA_KEY = VECTOR_COLUMNS_METADATA_KEY; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java` at
line 219, Add a deprecated alias constant in HoodieSchema for backward
compatibility: retain PARQUET_VECTOR_COLUMNS_METADATA_KEY as a public static
final String that delegates to VECTOR_COLUMNS_METADATA_KEY and mark it
`@Deprecated` with a javadoc pointing to VECTOR_COLUMNS_METADATA_KEY; update any
usages in the class to reference VECTOR_COLUMNS_METADATA_KEY only and keep the
alias for at least one release so downstream code compiling against
PARQUET_VECTOR_COLUMNS_METADATA_KEY continues to work.
Mirror of apache#18497 for automated bot review.
Original author: @rahil-c
Base branch: master
Summary by CodeRabbit
New Features
Bug Fixes
Tests