Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ private ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema requestedS
@Override
public HoodieSchema getSchema() {
try {
StructType structType = LanceArrowUtils.fromArrowSchema(arrowSchema);
Map<String, String> customMetadata = arrowSchema.getCustomMetadata();
Set<String> vectorColumnNames = HoodieSchema.parseVectorColumnNames(
customMetadata == null ? null : customMetadata.get(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY));
Comment on lines +206 to +207
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: VECTOR_COLUMNS_METADATA_KEY in the footer is now used. We can remove this usage later.

StructType structType = VectorConversionUtils.restoreVectorMetadata(
LanceArrowUtils.fromArrowSchema(arrowSchema), vectorColumnNames);
return HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, "record", "", false);
} catch (Exception e) {
throw new HoodieException("Failed to read schema from Lance file: " + path, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
import org.apache.hudi.io.storage.row.HoodieBloomFilterRowWriteSupport;
import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
Expand All @@ -36,11 +38,16 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.LanceArrowUtils;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
Expand Down Expand Up @@ -120,8 +127,8 @@ private HoodieSparkLanceWriter(StoragePath file,
Option<BloomFilter> bloomFilterOpt,
long maxFileSize) {
super(file, DEFAULT_BATCH_SIZE, bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
this.sparkSchema = sparkSchema;
this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema, DEFAULT_TIMEZONE, true);
this.sparkSchema = enrichSparkSchemaForLanceVectors(sparkSchema);
this.arrowSchema = LanceArrowUtils.toArrowSchema(this.sparkSchema, DEFAULT_TIMEZONE, true);
this.fileName = UTF8String.fromString(file.getName());
this.instantTime = UTF8String.fromString(instantTime);
this.populateMetaFields = populateMetaFields;
Expand All @@ -132,6 +139,55 @@ private HoodieSparkLanceWriter(StoragePath file,
};
}

/**
* For every field carrying a Hudi VECTOR logical type annotation
* (Spark metadata key {@link HoodieSchema#TYPE_METADATA_FIELD} starting with {@code "VECTOR"}),
* auto-attach the lance-spark metadata key {@link LanceArrowUtils#ARROW_FIXED_SIZE_LIST_SIZE_KEY()}
* with the vector's dimension so that {@link LanceArrowUtils#toArrowSchema} emits a native
* Arrow {@code FixedSizeList<elem, dim>} (Lance's vector column encoding) and
* {@link LanceArrowWriter} selects its fixed-size-list field writer when serializing values.
*
* <p>Lance-spark keys vector columns off the per-field
* {@link LanceArrowUtils#ARROW_FIXED_SIZE_LIST_SIZE_KEY()} (literal:
* {@code arrow.fixed-size-list.size}) metadata entry (see Lance Spark CREATE TABLE docs);
* we derive it from the VECTOR dimension so users don't have to set it alongside the
* Hudi descriptor.
*
* <p>Currently only FLOAT and DOUBLE element vectors are supported on Lance, matching
* lance-spark's {@code VectorUtils.shouldBeFixedSizeList}. Other element types would
* silently fall through to a plain list write, so we fail fast instead.
*/
private static StructType enrichSparkSchemaForLanceVectors(StructType sparkSchema) {
Map<Integer, HoodieSchema.Vector> vectorColumns =
VectorConversionUtils.detectVectorColumnsFromMetadata(sparkSchema);
if (vectorColumns.isEmpty()) {
return sparkSchema;
}
StructField[] fields = sparkSchema.fields();
StructField[] newFields = new StructField[fields.length];
for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];
HoodieSchema.Vector vec = vectorColumns.get(i);
if (vec == null) {
newFields[i] = field;
continue;
}
HoodieSchema.Vector.VectorElementType elemType = vec.getVectorElementType();
if (elemType != HoodieSchema.Vector.VectorElementType.FLOAT
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 enrichSparkSchemaForLanceVectors throws HoodieNotSupportedException at writer construction for any non-FLOAT/DOUBLE VECTOR. Is there a schema-validation layer upstream that catches this before the writer is created, or will a user with e.g. a VECTOR(N, INT8) column first discover the limitation as a failed write task? If it's the latter, consider surfacing this earlier (at DDL/commit planning) so partial writes don't leave orphan files.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok to add this defensive check here to throw HoodieNotSupportedException. We should have a follow-up (non-blocker) to add such check in schema validation as well. @rahil-c to create an GH issue.

&& elemType != HoodieSchema.Vector.VectorElementType.DOUBLE) {
throw new HoodieNotSupportedException(
"Lance base-file format currently supports FLOAT/DOUBLE VECTOR columns only; "
+ "got element type " + elemType + " for field '" + field.name() + "'");
}
Metadata enriched = new MetadataBuilder()
.withMetadata(field.metadata())
.putLong(LanceArrowUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY(), vec.getDimension())
Comment thread
yihua marked this conversation as resolved.
.build();
newFields[i] = new StructField(field.name(), field.dataType(), field.nullable(), enriched);
}
return new StructType(newFields);
}
Comment thread
rahil-c marked this conversation as resolved.

@Override
public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOException {
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
Expand Down Expand Up @@ -198,6 +254,27 @@ protected Schema getArrowSchema() {
return arrowSchema;
}

/**
* Emit Hudi's {@code hoodie.vector.columns} footer entry alongside any
* bloom-filter metadata. Mirrors the Parquet writer (see
* {@code HoodieRowParquetWriteSupport#init}) so Lance files carry the same
* self-describing VECTOR descriptor list that Parquet files do.
*
* <p>The read side today derives VECTOR identity from the Arrow
* {@code FixedSizeList<Float/Double, N>} type — this footer entry is a
* forward-compat guard: it lets future readers recover the exact descriptor
* (including fields the Arrow type cannot express, e.g. quantization tags)
* without a writer bump.
*/
@Override
protected Map<String, String> additionalSchemaMetadata() {
String value = VectorConversionUtils.buildVectorColumnsFooterValue(sparkSchema);
Comment thread
yihua marked this conversation as resolved.
if (value.isEmpty()) {
return Collections.emptyMap();
}
return Collections.singletonMap(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, value);
}

/**
* Update Hudi metadata fields in the InternalRow.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,23 @@
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.LanceArrowUtils;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
Expand All @@ -61,7 +67,7 @@ private VectorConversionUtils() {
* @return map from field index to Vector schema; empty map if schema is null or has no vectors
*/
public static Map<Integer, HoodieSchema.Vector> detectVectorColumns(HoodieSchema schema) {
Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new LinkedHashMap<>();
if (schema == null) {
return vectorColumnInfo;
}
Expand All @@ -75,6 +81,28 @@ public static Map<Integer, HoodieSchema.Vector> detectVectorColumns(HoodieSchema
return vectorColumnInfo;
}

/**
* Builds the {@link HoodieSchema#VECTOR_COLUMNS_METADATA_KEY} footer value
* from a Spark {@link StructType} by detecting VECTOR metadata annotations and
* delegating to {@link HoodieSchema#serializeVectorColumnsMetadata}.
*
* @param schema Spark StructType (may be null)
* @return comma-separated descriptor list, or empty string if no VECTOR columns
* @see HoodieSchema#serializeVectorColumnsMetadata(java.util.Map)
*/
public static String buildVectorColumnsFooterValue(StructType schema) {
if (schema == null) {
return "";
}
Map<Integer, HoodieSchema.Vector> detected = detectVectorColumnsFromMetadata(schema);
StructField[] fields = schema.fields();
LinkedHashMap<String, HoodieSchema.Vector> named = new LinkedHashMap<>();
for (Map.Entry<Integer, HoodieSchema.Vector> entry : detected.entrySet()) {
named.put(fields[entry.getKey()].name(), entry.getValue());
}
Comment thread
rahil-c marked this conversation as resolved.
return HoodieSchema.serializeVectorColumnsMetadata(named);
Comment thread
yihua marked this conversation as resolved.
}

/**
* Detects VECTOR columns from Spark StructType metadata annotations.
* Fields with metadata key {@link HoodieSchema#TYPE_METADATA_FIELD} starting with "VECTOR"
Expand All @@ -84,7 +112,8 @@ public static Map<Integer, HoodieSchema.Vector> detectVectorColumns(HoodieSchema
* @return map from field index to Vector schema; empty map if no vectors found
*/
public static Map<Integer, HoodieSchema.Vector> detectVectorColumnsFromMetadata(StructType schema) {
Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
// Use LinkedHashMap so callers iterate in field-ordinal order (stable across JDKs).
Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new LinkedHashMap<>();
if (schema == null) {
return vectorColumnInfo;
}
Expand Down Expand Up @@ -235,4 +264,80 @@ public static void convertRowVectorColumns(InternalRow row, GenericInternalRow r
}
}
}

/**
* Re-attaches {@link HoodieSchema#TYPE_METADATA_FIELD} to Spark fields that are
* Arrow {@code FixedSizeList<Float32|Float64, dim>} in the Lance file.
* {@code LanceArrowUtils.fromArrowSchema} strips Hudi's VECTOR descriptor during
* Arrow→Spark conversion but preserves the fixed-size-list dimension under the
* lance-spark metadata key {@link LanceArrowUtils#ARROW_FIXED_SIZE_LIST_SIZE_KEY()}.
*
* <p>A FixedSizeList alone does not prove the column is a Hudi VECTOR — a
* non-Hudi Lance file could contain one. Callers must pass {@code vectorColumnNames}
* (derived from the Hudi schema's VECTOR-tagged fields, e.g. via
* {@link #detectVectorColumnsFromMetadata(StructType)}) so that only fields known to
* be Hudi VECTORs are restored. Pass an empty set to skip the restore entirely.
*
* <p>Nested structs are not recursed.
*/
public static StructType restoreVectorMetadata(StructType convertedSpark, Set<String> vectorColumnNames) {
if (convertedSpark == null) {
return null;
}
if (vectorColumnNames == null || vectorColumnNames.isEmpty()) {
return convertedSpark;
}
StructField[] sparkFields = convertedSpark.fields();
Comment thread
yihua marked this conversation as resolved.
StructField[] newFields = new StructField[sparkFields.length];
boolean changed = false;
for (int i = 0; i < sparkFields.length; i++) {
StructField sf = sparkFields[i];
String descriptor = vectorColumnNames.contains(sf.name()) ? deriveVectorDescriptor(sf) : null;
if (descriptor == null) {
newFields[i] = sf;
} else {
// VECTOR contract: elements are non-nullable. lance-spark's Arrow→Spark
// conversion produces ArrayType(containsNull=true); force containsNull=false
// so the field round-trips through HoodieSchema conversion.
DataType arrayType = DataTypes.createArrayType(
((ArrayType) sf.dataType()).elementType(), false);
newFields[i] = new StructField(
sf.name(),
arrayType,
sf.nullable(),
new MetadataBuilder()
.withMetadata(sf.metadata())
.putString(HoodieSchema.TYPE_METADATA_FIELD, descriptor)
.build());
changed = true;
}
}
return changed ? new StructType(newFields) : convertedSpark;
}

/**
* Derives Hudi's VECTOR type descriptor for a Spark field if lance-spark tagged it
* with {@link LanceArrowUtils#ARROW_FIXED_SIZE_LIST_SIZE_KEY()} and its data type is
* {@code ArrayType(Float|Double, containsNull=false)}; otherwise returns null.
*/
private static String deriveVectorDescriptor(StructField sf) {
String sizeKey = LanceArrowUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY();
if (!sf.metadata().contains(sizeKey)) {
return null;
}
if (!(sf.dataType() instanceof ArrayType)) {
return null;
}
DataType elemType = ((ArrayType) sf.dataType()).elementType();
HoodieSchema.Vector.VectorElementType elementType;
if (DataTypes.FloatType.equals(elemType)) {
elementType = HoodieSchema.Vector.VectorElementType.FLOAT;
} else if (DataTypes.DoubleType.equals(elemType)) {
elementType = HoodieSchema.Vector.VectorElementType.DOUBLE;
} else {
return null;
}
int dim = (int) sf.metadata().getLong(sizeKey);
return HoodieSchema.createVector(dim, elementType).toTypeDescriptor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public WriteContext init(Configuration configuration) {
}
String vectorMeta = HoodieSchema.buildVectorColumnsMetadataValue(schema);
if (!vectorMeta.isEmpty()) {
metadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
metadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
}
Configuration configurationCopy = new Configuration(configuration);
configurationCopy.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, Boolean.toString(writeLegacyListFormat));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap, filterIsSafeForPrimaryKey, getAppliedRequiredSchema}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
Expand Down Expand Up @@ -81,9 +81,16 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
}
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)

// Detect VECTOR columns and replace with BinaryType for the Parquet reader
// (Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY which Spark maps to BinaryType)
val vectorColumnInfo = SparkFileFormatInternalRowReaderContext.detectVectorColumns(requiredSchema)
// Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY, so the reader needs BinaryType
// and we decode back to ArrayType below. Lance returns ArrayType natively, so skip
// the rewrite only for Lance base files; log files always go through the rewrite path.
val isLanceBaseFile = FSUtils.isBaseFile(filePath) &&
tableConfig.getBaseFileFormat == HoodieFileFormat.LANCE
val vectorColumnInfo: Map[Int, HoodieSchema.Vector] = if (isLanceBaseFile) {
Map.empty
} else {
SparkFileFormatInternalRowReaderContext.detectVectorColumns(requiredSchema)
}
val parquetReadStructType = if (vectorColumnInfo.nonEmpty) {
SparkFileFormatInternalRowReaderContext.replaceVectorColumnsWithBinary(structType, vectorColumnInfo)
} else {
Expand Down
Loading
Loading