diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java index 3bf6625a4fd57..905bd74026ad7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java @@ -202,7 +202,11 @@ private ClosableIterator getUnsafeRowIterator(HoodieSchema requestedS @Override public HoodieSchema getSchema() { try { - StructType structType = LanceArrowUtils.fromArrowSchema(arrowSchema); + Map customMetadata = arrowSchema.getCustomMetadata(); + Set vectorColumnNames = HoodieSchema.parseVectorColumnNames( + customMetadata == null ? null : customMetadata.get(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY)); + StructType structType = VectorConversionUtils.restoreVectorMetadata( + LanceArrowUtils.fromArrowSchema(arrowSchema), vectorColumnNames); return HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, "record", "", false); } catch (Exception e) { throw new HoodieException("Failed to read schema from Lance file: " + path, e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java index 2415f26653fa4..64cfb2322bcab 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java @@ -23,7 +23,9 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.io.lance.HoodieBaseLanceWriter; import org.apache.hudi.io.storage.row.HoodieBloomFilterRowWriteSupport; import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter; @@ -36,11 +38,16 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.LanceArrowUtils; import org.apache.spark.unsafe.types.UTF8String; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.function.Function; import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD; @@ -120,8 +127,8 @@ private HoodieSparkLanceWriter(StoragePath file, Option bloomFilterOpt, long maxFileSize) { super(file, DEFAULT_BATCH_SIZE, bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new)); - this.sparkSchema = sparkSchema; - this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema, DEFAULT_TIMEZONE, true); + this.sparkSchema = enrichSparkSchemaForLanceVectors(sparkSchema); + this.arrowSchema = LanceArrowUtils.toArrowSchema(this.sparkSchema, DEFAULT_TIMEZONE, true); this.fileName = UTF8String.fromString(file.getName()); this.instantTime = UTF8String.fromString(instantTime); this.populateMetaFields = populateMetaFields; @@ -132,6 +139,55 @@ private HoodieSparkLanceWriter(StoragePath file, }; } + /** + * For every field carrying a Hudi VECTOR logical type annotation + * (Spark metadata key {@link HoodieSchema#TYPE_METADATA_FIELD} starting with {@code "VECTOR"}), + * auto-attach the lance-spark metadata key {@link LanceArrowUtils#ARROW_FIXED_SIZE_LIST_SIZE_KEY()} + * with the vector's dimension so that {@link LanceArrowUtils#toArrowSchema} emits a native + * Arrow {@code FixedSizeList} (Lance's vector column encoding) and + * {@link LanceArrowWriter} selects its fixed-size-list field writer when serializing values. + * + *

Lance-spark keys vector columns off the per-field + * {@link LanceArrowUtils#ARROW_FIXED_SIZE_LIST_SIZE_KEY()} (literal: + * {@code arrow.fixed-size-list.size}) metadata entry (see Lance Spark CREATE TABLE docs); + * we derive it from the VECTOR dimension so users don't have to set it alongside the + * Hudi descriptor. + * + *

Currently only FLOAT and DOUBLE element vectors are supported on Lance, matching + * lance-spark's {@code VectorUtils.shouldBeFixedSizeList}. Other element types would + * silently fall through to a plain list write, so we fail fast instead. + */ + private static StructType enrichSparkSchemaForLanceVectors(StructType sparkSchema) { + Map vectorColumns = + VectorConversionUtils.detectVectorColumnsFromMetadata(sparkSchema); + if (vectorColumns.isEmpty()) { + return sparkSchema; + } + StructField[] fields = sparkSchema.fields(); + StructField[] newFields = new StructField[fields.length]; + for (int i = 0; i < fields.length; i++) { + StructField field = fields[i]; + HoodieSchema.Vector vec = vectorColumns.get(i); + if (vec == null) { + newFields[i] = field; + continue; + } + HoodieSchema.Vector.VectorElementType elemType = vec.getVectorElementType(); + if (elemType != HoodieSchema.Vector.VectorElementType.FLOAT + && elemType != HoodieSchema.Vector.VectorElementType.DOUBLE) { + throw new HoodieNotSupportedException( + "Lance base-file format currently supports FLOAT/DOUBLE VECTOR columns only; " + + "got element type " + elemType + " for field '" + field.name() + "'"); + } + Metadata enriched = new MetadataBuilder() + .withMetadata(field.metadata()) + .putLong(LanceArrowUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY(), vec.getDimension()) + .build(); + newFields[i] = new StructField(field.name(), field.dataType(), field.nullable(), enriched); + } + return new StructType(newFields); + } + @Override public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOException { UTF8String recordKey = UTF8String.fromString(key.getRecordKey()); @@ -198,6 +254,27 @@ protected Schema getArrowSchema() { return arrowSchema; } + /** + * Emit Hudi's {@code hoodie.vector.columns} footer entry alongside any + * bloom-filter metadata. Mirrors the Parquet writer (see + * {@code HoodieRowParquetWriteSupport#init}) so Lance files carry the same + * self-describing VECTOR descriptor list that Parquet files do. + * + *

The read side today derives VECTOR identity from the Arrow + * {@code FixedSizeList} type — this footer entry is a + * forward-compat guard: it lets future readers recover the exact descriptor + * (including fields the Arrow type cannot express, e.g. quantization tags) + * without a writer bump. + */ + @Override + protected Map additionalSchemaMetadata() { + String value = VectorConversionUtils.buildVectorColumnsFooterValue(sparkSchema); + if (value.isEmpty()) { + return Collections.emptyMap(); + } + return Collections.singletonMap(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, value); + } + /** * Update Hudi metadata fields in the InternalRow. * diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java index 2bf8e86b5d0fd..a8cc02f58dad3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java @@ -25,17 +25,23 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.BinaryType$; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.LanceArrowUtils; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import java.nio.ByteBuffer; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; @@ -61,7 +67,7 @@ private VectorConversionUtils() { * @return map from field index to Vector schema; empty map if schema is null or has no vectors */ public static Map detectVectorColumns(HoodieSchema schema) { - Map vectorColumnInfo = new HashMap<>(); + Map vectorColumnInfo = new LinkedHashMap<>(); if (schema == null) { return vectorColumnInfo; } @@ -75,6 +81,28 @@ public static Map detectVectorColumns(HoodieSchema return vectorColumnInfo; } + /** + * Builds the {@link HoodieSchema#VECTOR_COLUMNS_METADATA_KEY} footer value + * from a Spark {@link StructType} by detecting VECTOR metadata annotations and + * delegating to {@link HoodieSchema#serializeVectorColumnsMetadata}. + * + * @param schema Spark StructType (may be null) + * @return comma-separated descriptor list, or empty string if no VECTOR columns + * @see HoodieSchema#serializeVectorColumnsMetadata(java.util.Map) + */ + public static String buildVectorColumnsFooterValue(StructType schema) { + if (schema == null) { + return ""; + } + Map detected = detectVectorColumnsFromMetadata(schema); + StructField[] fields = schema.fields(); + LinkedHashMap named = new LinkedHashMap<>(); + for (Map.Entry entry : detected.entrySet()) { + named.put(fields[entry.getKey()].name(), entry.getValue()); + } + return HoodieSchema.serializeVectorColumnsMetadata(named); + } + /** * Detects VECTOR columns from Spark StructType metadata annotations. * Fields with metadata key {@link HoodieSchema#TYPE_METADATA_FIELD} starting with "VECTOR" @@ -84,7 +112,8 @@ public static Map detectVectorColumns(HoodieSchema * @return map from field index to Vector schema; empty map if no vectors found */ public static Map detectVectorColumnsFromMetadata(StructType schema) { - Map vectorColumnInfo = new HashMap<>(); + // Use LinkedHashMap so callers iterate in field-ordinal order (stable across JDKs). + Map vectorColumnInfo = new LinkedHashMap<>(); if (schema == null) { return vectorColumnInfo; } @@ -235,4 +264,80 @@ public static void convertRowVectorColumns(InternalRow row, GenericInternalRow r } } } + + /** + * Re-attaches {@link HoodieSchema#TYPE_METADATA_FIELD} to Spark fields that are + * Arrow {@code FixedSizeList} in the Lance file. + * {@code LanceArrowUtils.fromArrowSchema} strips Hudi's VECTOR descriptor during + * Arrow→Spark conversion but preserves the fixed-size-list dimension under the + * lance-spark metadata key {@link LanceArrowUtils#ARROW_FIXED_SIZE_LIST_SIZE_KEY()}. + * + *

A FixedSizeList alone does not prove the column is a Hudi VECTOR — a + * non-Hudi Lance file could contain one. Callers must pass {@code vectorColumnNames} + * (derived from the Hudi schema's VECTOR-tagged fields, e.g. via + * {@link #detectVectorColumnsFromMetadata(StructType)}) so that only fields known to + * be Hudi VECTORs are restored. Pass an empty set to skip the restore entirely. + * + *

Nested structs are not recursed. + */ + public static StructType restoreVectorMetadata(StructType convertedSpark, Set vectorColumnNames) { + if (convertedSpark == null) { + return null; + } + if (vectorColumnNames == null || vectorColumnNames.isEmpty()) { + return convertedSpark; + } + StructField[] sparkFields = convertedSpark.fields(); + StructField[] newFields = new StructField[sparkFields.length]; + boolean changed = false; + for (int i = 0; i < sparkFields.length; i++) { + StructField sf = sparkFields[i]; + String descriptor = vectorColumnNames.contains(sf.name()) ? deriveVectorDescriptor(sf) : null; + if (descriptor == null) { + newFields[i] = sf; + } else { + // VECTOR contract: elements are non-nullable. lance-spark's Arrow→Spark + // conversion produces ArrayType(containsNull=true); force containsNull=false + // so the field round-trips through HoodieSchema conversion. + DataType arrayType = DataTypes.createArrayType( + ((ArrayType) sf.dataType()).elementType(), false); + newFields[i] = new StructField( + sf.name(), + arrayType, + sf.nullable(), + new MetadataBuilder() + .withMetadata(sf.metadata()) + .putString(HoodieSchema.TYPE_METADATA_FIELD, descriptor) + .build()); + changed = true; + } + } + return changed ? new StructType(newFields) : convertedSpark; + } + + /** + * Derives Hudi's VECTOR type descriptor for a Spark field if lance-spark tagged it + * with {@link LanceArrowUtils#ARROW_FIXED_SIZE_LIST_SIZE_KEY()} and its data type is + * {@code ArrayType(Float|Double, containsNull=false)}; otherwise returns null. + */ + private static String deriveVectorDescriptor(StructField sf) { + String sizeKey = LanceArrowUtils.ARROW_FIXED_SIZE_LIST_SIZE_KEY(); + if (!sf.metadata().contains(sizeKey)) { + return null; + } + if (!(sf.dataType() instanceof ArrayType)) { + return null; + } + DataType elemType = ((ArrayType) sf.dataType()).elementType(); + HoodieSchema.Vector.VectorElementType elementType; + if (DataTypes.FloatType.equals(elemType)) { + elementType = HoodieSchema.Vector.VectorElementType.FLOAT; + } else if (DataTypes.DoubleType.equals(elemType)) { + elementType = HoodieSchema.Vector.VectorElementType.DOUBLE; + } else { + return null; + } + int dim = (int) sf.metadata().getLong(sizeKey); + return HoodieSchema.createVector(dim, elementType).toTypeDescriptor(); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java index 299c56213067a..646fb2330833f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java @@ -322,7 +322,7 @@ public WriteContext init(Configuration configuration) { } String vectorMeta = HoodieSchema.buildVectorColumnsMetadataValue(schema); if (!vectorMeta.isEmpty()) { - metadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY, vectorMeta); + metadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta); } Configuration configurationCopy = new Configuration(configuration); configurationCopy.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, Boolean.toString(writeLegacyListFormat)); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 8c0980e009f05..92b963f683906 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap, filterIsSafeForPrimaryKey, getAppliedRequiredSchema} import org.apache.hudi.common.engine.HoodieReaderContext import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME @@ -81,9 +81,16 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR } val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema) - // Detect VECTOR columns and replace with BinaryType for the Parquet reader - // (Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY which Spark maps to BinaryType) - val vectorColumnInfo = SparkFileFormatInternalRowReaderContext.detectVectorColumns(requiredSchema) + // Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY, so the reader needs BinaryType + // and we decode back to ArrayType below. Lance returns ArrayType natively, so skip + // the rewrite only for Lance base files; log files always go through the rewrite path. + val isLanceBaseFile = FSUtils.isBaseFile(filePath) && + tableConfig.getBaseFileFormat == HoodieFileFormat.LANCE + val vectorColumnInfo: Map[Int, HoodieSchema.Vector] = if (isLanceBaseFile) { + Map.empty + } else { + SparkFileFormatInternalRowReaderContext.detectVectorColumns(requiredSchema) + } val parquetReadStructType = if (vectorColumnInfo.nonEmpty) { SparkFileFormatInternalRowReaderContext.replaceVectorColumnsWithBinary(structType, vectorColumnInfo) } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index 09b49743b36ab..dff06a4c0e0f0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -42,6 +42,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -215,41 +217,99 @@ private static Pair> tokenizeTypeDescriptor(Strin public static final String PARQUET_ARRAY_AVRO = "." + ARRAY_LIST_ELEMENT; /** - * Parquet file-footer metadata key under which VECTOR column names and type descriptors + * Base-file footer metadata key under which VECTOR column names and type descriptors * are recorded. The value is a comma-separated list of {@code colName:VECTOR(dim[,elemType])} * entries, e.g. {@code "embedding:VECTOR(128),tags:VECTOR(64,INT8)"}. * - *

Stored as file-level key-value metadata (Parquet footer) so that any reader can - * identify vector columns without needing the Hudi schema store. + *

Stored as file-level key-value metadata (Parquet footer, Lance schema metadata) + * so that any reader can identify vector columns without needing the Hudi schema store. */ - public static final String PARQUET_VECTOR_COLUMNS_METADATA_KEY = "hoodie.vector.columns"; + public static final String VECTOR_COLUMNS_METADATA_KEY = "hoodie.vector.columns"; /** - * Builds the value string for {@link #PARQUET_VECTOR_COLUMNS_METADATA_KEY}. + * Serializes a name-to-Vector map into the comma-separated + * {@code colName:VECTOR(dim[,elemType])} format used for {@link #VECTOR_COLUMNS_METADATA_KEY}. + * + *

This is the single canonical serializer — all format-specific code (Parquet, Lance) + * should build the map from their respective schema representation and delegate here. + * + * @param vectorColumns ordered map of field name to Vector descriptor (iteration order is preserved) + * @return comma-separated descriptor list, or empty string if the map is null or empty + */ + public static String serializeVectorColumnsMetadata(java.util.Map vectorColumns) { + if (vectorColumns == null || vectorColumns.isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (java.util.Map.Entry entry : vectorColumns.entrySet()) { + if (sb.length() > 0) { + sb.append(','); + } + sb.append(entry.getKey()).append(':').append(entry.getValue().toTypeDescriptor()); + } + return sb.toString(); + } + + /** + * Builds the value string for {@link #VECTOR_COLUMNS_METADATA_KEY} from a {@link HoodieSchema}. * * @param schema a HoodieSchema of type RECORD (or null) * @return comma-separated {@code colName:VECTOR(dim[,elemType])} entries, or empty string * if the schema is null or has no VECTOR columns + * @see #serializeVectorColumnsMetadata(java.util.Map) */ public static String buildVectorColumnsMetadataValue(HoodieSchema schema) { if (schema == null || schema.isSchemaNull()) { return ""; } - List fields = schema.getFields(); - StringBuilder sb = new StringBuilder(); - for (HoodieSchemaField field : fields) { + LinkedHashMap vectorColumns = new LinkedHashMap<>(); + for (HoodieSchemaField field : schema.getFields()) { HoodieSchema fieldSchema = field.schema().getNonNullType(); if (fieldSchema.getType() == HoodieSchemaType.VECTOR) { - Vector vectorSchema = (Vector) fieldSchema; - if (sb.length() > 0) { - sb.append(','); - } - sb.append(field.name()).append(':').append(vectorSchema.toTypeDescriptor()); + vectorColumns.put(field.name(), (Vector) fieldSchema); } } - return sb.toString(); + return serializeVectorColumnsMetadata(vectorColumns); + } + + /** + * Parses the comma-separated {@link #VECTOR_COLUMNS_METADATA_KEY} footer value and + * returns the set of vector column field names. Commas inside parentheses (e.g. inside + * the VECTOR descriptor {@code VECTOR(128, DOUBLE)}) are not treated as separators. + * + * @param footerValue raw value from the file footer, or null / empty + * @return set of field names (preserves insertion order), or empty set if input is null / empty + */ + public static Set parseVectorColumnNames(String footerValue) { + if (footerValue == null || footerValue.isEmpty()) { + return Collections.emptySet(); + } + LinkedHashSet names = new LinkedHashSet<>(); + int depth = 0; + int start = 0; + for (int i = 0; i < footerValue.length(); i++) { + char c = footerValue.charAt(i); + if (c == '(') { + depth++; + } else if (c == ')') { + depth--; + } else if (c == ',' && depth == 0) { + addVectorColumnName(footerValue, start, i, names); + start = i + 1; + } + } + addVectorColumnName(footerValue, start, footerValue.length(), names); + return names; } + private static void addVectorColumnName(String s, int start, int end, Set names) { + int colon = s.indexOf(':', start); + if (colon > start && colon < end) { + names.add(s.substring(start, colon).trim()); + } + } + + private Schema avroSchema; private HoodieSchemaType type; private transient List fields; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java index 547b02787369f..df7930060fe93 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java @@ -50,7 +50,7 @@ public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, Opt this.properties = properties; String vectorMeta = HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema); if (!vectorMeta.isEmpty()) { - footerMetadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY, vectorMeta); + footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta); } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java index 8f76c61fa7249..f43c56625f592 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java @@ -37,6 +37,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collections; import java.util.Map; /** @@ -105,6 +106,20 @@ protected HoodieBaseLanceWriter(StoragePath path, int batchSize, */ protected abstract Schema getArrowSchema(); + /** + * Subclass hook for emitting additional Lance file-footer key-value metadata + * alongside any bloom-filter entries. Called once during {@link #close()}. + * + *

Default implementation returns an empty map. Overriders should return a + * fresh map; the caller does not retain a reference. Colliding keys are + * overwritten per {@code LanceFileWriter.addSchemaMetadata} semantics. + * + * @return map of footer metadata key-value pairs, or empty map for none + */ + protected Map additionalSchemaMetadata() { + return Collections.emptyMap(); + } + /** * Write a single record. Records are buffered and flushed in batches. * @@ -163,11 +178,21 @@ public void close() throws IOException { writer.write(root); } - // Finalize and write bloom filter metadata - if (writer != null && bloomFilterWriteSupportOpt.isPresent()) { - Map metadata = bloomFilterWriteSupportOpt.get().finalizeMetadata(); - if (!metadata.isEmpty()) { - writer.addSchemaMetadata(metadata); + if (writer != null) { + // Finalize and write bloom filter metadata + if (bloomFilterWriteSupportOpt.isPresent()) { + Map metadata = bloomFilterWriteSupportOpt.get().finalizeMetadata(); + if (!metadata.isEmpty()) { + writer.addSchemaMetadata(metadata); + } + } + + // Allow subclasses to contribute additional footer key-value metadata + // (e.g. Spark writer emits `hoodie.vector.columns` for forward-compat read). + // Called unconditionally; returns an empty map when no VECTOR columns are present. + Map extra = additionalSchemaMetadata(); + if (extra != null && !extra.isEmpty()) { + writer.addSchemaMetadata(extra); } } } catch (Exception e) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index bc24051a633f3..183eea6fc5baf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -23,7 +23,7 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter import org.apache.hudi.common.util import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.io.memory.HoodieArrowAllocator -import org.apache.hudi.io.storage.{HoodieSparkLanceReader, LanceRecordIterator} +import org.apache.hudi.io.storage.{HoodieSparkLanceReader, LanceRecordIterator, VectorConversionUtils} import org.apache.hudi.storage.StorageConfiguration import org.apache.hadoop.conf.Configuration @@ -90,9 +90,20 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // Open Lance file reader val lanceReader = LanceFileReader.open(filePath, allocator) - // Get schema from Lance file + // Get schema from Lance file. lance-spark strips Hudi's VECTOR descriptor during + // Arrow→Spark conversion but keeps the fixed-size-list dimension on the Spark + // field metadata; rebuild the descriptor from that, using requiredSchema + // as the source of truth for which columns are Hudi VECTORs — so non-Hudi fixed-size-lists aren't mis-tagged. val arrowSchema = lanceReader.schema() - val fileSchema = LanceArrowUtils.fromArrowSchema(arrowSchema) + val vectorColumnNames: java.util.Set[String] = VectorConversionUtils + .detectVectorColumnsFromMetadata(requiredSchema) + .keySet() + .asScala + .map(i => requiredSchema.fields(i).name) + .toSet + .asJava + val fileSchema = VectorConversionUtils.restoreVectorMetadata( + LanceArrowUtils.fromArrowSchema(arrowSchema), vectorColumnNames) // Build type change info for schema evolution val (implicitTypeChangeInfo, sparkRequestSchema) = diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 9ba25424a9ca8..ef14369107de2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -431,11 +431,28 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, /** * Detects vector columns and replaces them with BinaryType in one step. + * + *

The BinaryType rewrite is Parquet-specific: Hudi stores VECTOR columns as + * FIXED_LEN_BYTE_ARRAY in Parquet, so the reader must see BinaryType and the raw + * bytes are post-converted back to ArrayType. Other formats (e.g. Lance) encode + * vectors natively as Arrow FixedSizeList and return ArrayType directly, so the + * rewrite would introduce a spurious ArrayType→BinaryType cast during schema + * evolution and break the read. Skip the rewrite for those formats. + * * @return (modified schema with BinaryType for vectors, vector column ordinal map) */ private def withVectorRewrite(schema: StructType): (StructType, Map[Int, HoodieSchema.Vector]) = { - val vecs = detectVectorColumns(schema) - if (vecs.nonEmpty) (replaceVectorFieldsWithBinary(schema, vecs), vecs) else (schema, vecs) + // Only Parquet needs the BinaryType rewrite; other formats (Lance) return ArrayType natively. + if (hoodieFileFormat != HoodieFileFormat.PARQUET) { + (schema, Map.empty[Int, HoodieSchema.Vector]) + } else { + val vecs = detectVectorColumns(schema) + if (vecs.isEmpty) { + (schema, vecs) + } else { + (replaceVectorFieldsWithBinary(schema, vecs), vecs) + } + } } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java index b2593fc908162..4aef3d8a16b0e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java @@ -27,6 +27,8 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; @@ -41,6 +43,9 @@ import org.apache.spark.sql.catalyst.util.GenericArrayData; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -648,6 +653,38 @@ public void testReadWithRequestedSchema() throws Exception { } } + @Test + public void testGetSchemaRestoresVectorMetadata() throws Exception { + int dim = 4; + Metadata vectorFieldMetadata = new MetadataBuilder() + .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(" + dim + ")") + .build(); + StructType schema = new StructType() + .add(new StructField("id", DataTypes.IntegerType, false, Metadata.empty())) + .add(new StructField( + "embedding", + DataTypes.createArrayType(DataTypes.FloatType, false), + false, + vectorFieldMetadata)); + + List rows = new ArrayList<>(); + rows.add(createRow(1, new Object[] {1.0f, 2.0f, 3.0f, 4.0f})); + rows.add(createRow(2, new Object[] {5.0f, 6.0f, 7.0f, 8.0f})); + + StoragePath path = new StoragePath(tempDir.getAbsolutePath() + "/test_vector_schema.lance"); + try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema, rows)) { + HoodieSchema readSchema = reader.getSchema(); + HoodieSchemaField embedding = readSchema.getField("embedding") + .orElseThrow(() -> new AssertionError("embedding field missing on read schema")); + HoodieSchema fieldSchema = embedding.schema().getNonNullType(); + assertEquals(HoodieSchemaType.VECTOR, fieldSchema.getType(), + "embedding must be restored as VECTOR from the FixedSizeList encoding alone"); + HoodieSchema.Vector vec = (HoodieSchema.Vector) fieldSchema; + assertEquals(dim, vec.getDimension()); + assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT, vec.getVectorElementType()); + } + } + private void assertBloomFilter(HoodieSparkLanceReader reader, Class clazz, String minKey, String maxKey, int keyCount) { BloomFilter bloomFilter = reader.readBloomFilter(); assertInstanceOf(clazz, bloomFilter); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 65f0f6a7a6937..01172d58706e9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -22,6 +22,7 @@ import org.apache.hudi.DefaultSparkRecordMerger import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType} +import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.view.{FileSystemViewManager, FileSystemViewStorageConfig} import org.apache.hudi.common.testutils.HoodieTestUtils @@ -30,12 +31,16 @@ import org.apache.hudi.io.storage.HoodieSparkLanceReader import org.apache.hudi.storage.StoragePath import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types._ import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue} import org.junit.jupiter.api.condition.DisabledIfSystemProperty import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource +import org.lance.file.LanceFileReader import java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectors @@ -935,6 +940,235 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { "Table should use Lance base file format") } + /** + * Vector round-trip test parameterized over COW + MOR. Covers two non-null VECTOR + * columns of different element types (FLOAT, DOUBLE) and dimensions, and exercises + * the upsert path (MOR log-merge on MOR, file rewrite on COW). + * + *

Nullable-vector coverage lives in {@code testNullableVectorRoundTrip} because + * merging a null-valued vector through the upsert path currently errors out in + * the Lance reader; tracked as a separate follow-up. + */ + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testMultipleVectorColumns(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_vec_multi_${tableType.name().toLowerCase}" + val tablePath = s"$basePath/$tableName" + + val embeddingDim = 3 + val featuresDim = 2 + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("name", StringType, nullable = false), + StructField("age", IntegerType, nullable = false), + StructField("embedding", + ArrayType(FloatType, containsNull = false), + nullable = false, + vectorMetadata(s"VECTOR($embeddingDim)")), + StructField("features", + ArrayType(DoubleType, containsNull = false), + nullable = false, + vectorMetadata(s"VECTOR($featuresDim, DOUBLE)")) + )) + + // Initial insert. + val data1 = Seq( + Row(1, "Alice", 30, Array(1.0f, 2.0f, 3.0f), Array(10.0d, 20.0d)), + Row(2, "Bob", 25, Array(4.0f, 5.0f, 6.0f), Array(30.0d, 40.0d)), + Row(3, "Charlie", 35, Array(7.0f, 8.0f, 9.0f), Array(50.0d, 60.0d)) + ) + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(data1), schema).coalesce(1) + writeDataframe(tableType, tableName, tablePath, df1, + saveMode = SaveMode.Overwrite, operation = Some("insert")) + + // Upsert — update Bob's embedding, age, and features. + val data2 = Seq( + Row(2, "Bob", 40, Array(10.0f, 20.0f, 30.0f), Array(70.0d, 80.0d)) + ) + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(data2), schema).coalesce(1) + writeDataframe(tableType, tableName, tablePath, df2, operation = Some("upsert")) + + val readDf = spark.read.format("hudi").load(tablePath) + .select("id", "name", "age", "embedding", "features") + assertEquals( + ArrayType(FloatType, containsNull = false), + readDf.schema("embedding").dataType) + assertEquals( + ArrayType(DoubleType, containsNull = false), + readDf.schema("features").dataType) + assertHudiTypeMetadata(readDf.schema("embedding"), s"VECTOR($embeddingDim)") + assertHudiTypeMetadata(readDf.schema("features"), s"VECTOR($featuresDim, DOUBLE)") + + val rows = readDf.collect().sortBy(_.getInt(0)) + assertEquals(3, rows.length) + // Alice unchanged. + assertEquals(Seq(1.0f, 2.0f, 3.0f), rows(0).getSeq[Float](3).toSeq) + assertEquals(Seq(10.0d, 20.0d), rows(0).getSeq[Double](4).toSeq) + // Bob upserted. + assertEquals(40, rows(1).getInt(2), "Bob's age should be updated to 40") + assertEquals(Seq(10.0f, 20.0f, 30.0f), rows(1).getSeq[Float](3).toSeq) + assertEquals(Seq(70.0d, 80.0d), rows(1).getSeq[Double](4).toSeq) + // Charlie unchanged. + assertEquals(Seq(7.0f, 8.0f, 9.0f), rows(2).getSeq[Float](3).toSeq) + assertEquals(Seq(50.0d, 60.0d), rows(2).getSeq[Double](4).toSeq) + + // Validate Lance file physical schema + footer on the base files produced. + assertLanceFieldIsFixedSizeList(tablePath, "embedding", embeddingDim) + assertLanceFieldIsFixedSizeList(tablePath, "features", featuresDim) + assertLanceFooterHasVectorColumns(tablePath, + s"embedding:VECTOR($embeddingDim),features:VECTOR($featuresDim, DOUBLE)") + } + + /** + * Nullable-vector coverage — kept separate from {@code testMultipleVectorColumns} + * because folding this case into the upsert/merge path errors out in the current + * Lance reader (null vector element read). + */ + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testNullableVectorRoundTrip(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_vec_nullable_${tableType.name().toLowerCase}" + val tablePath = s"$basePath/$tableName" + + val dim = 3 + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("embedding", + ArrayType(FloatType, containsNull = false), + nullable = true, + vectorMetadata(s"VECTOR($dim)")) + )) + val data = Seq( + Row(1, Array(1.0f, 2.0f, 3.0f)), + Row(2, null), + Row(3, Array(7.0f, 8.0f, 9.0f)) + ) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).coalesce(1) + + writeDataframe(tableType, tableName, tablePath, df, saveMode = SaveMode.Overwrite) + + val readDf = spark.read.format("hudi").load(tablePath).select("id", "embedding") + assertHudiTypeMetadata(readDf.schema("embedding"), s"VECTOR($dim)") + + val rows = readDf.collect().sortBy(_.getInt(0)) + assertEquals(3, rows.length) + assertEquals(Seq(1.0f, 2.0f, 3.0f), rows(0).getSeq[Float](1).toSeq) + assertTrue(rows(1).isNullAt(1), "Row with id=2 should have null embedding") + assertEquals(Seq(7.0f, 8.0f, 9.0f), rows(2).getSeq[Float](1).toSeq) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testVectorProjection(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_vec_proj_${tableType.name().toLowerCase}" + val tablePath = s"$basePath/$tableName" + + val dim = 4 + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("name", StringType, nullable = false), + StructField("embedding", + ArrayType(FloatType, containsNull = false), + nullable = false, + vectorMetadata(s"VECTOR($dim)")) + )) + val data = Seq( + Row(1, "Alice", Array(1.0f, 2.0f, 3.0f, 4.0f)), + Row(2, "Bob", Array(5.0f, 6.0f, 7.0f, 8.0f)) + ) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).coalesce(1) + writeDataframe(tableType, tableName, tablePath, df, saveMode = SaveMode.Overwrite) + + // Project only the vector column + val vecOnly = spark.read.format("hudi").load(tablePath).select("embedding") + assertEquals(1, vecOnly.schema.fields.length) + assertHudiTypeMetadata(vecOnly.schema("embedding"), s"VECTOR($dim)") + val vecRows = vecOnly.collect().map(_.getSeq[Float](0).toSeq).toSet + assertEquals(Set(Seq(1.0f, 2.0f, 3.0f, 4.0f), Seq(5.0f, 6.0f, 7.0f, 8.0f)), vecRows) + + // Project vector alongside Hudi metadata columns + val withMeta = spark.read.format("hudi").load(tablePath) + .select("_hoodie_record_key", "embedding") + assertEquals(2, withMeta.schema.fields.length) + assertHudiTypeMetadata(withMeta.schema("embedding"), s"VECTOR($dim)") + val metaRows = withMeta.collect().map(r => + r.getString(0) -> r.getSeq[Float](1).toSeq).toMap + assertEquals(Seq(1.0f, 2.0f, 3.0f, 4.0f), metaRows("1")) + assertEquals(Seq(5.0f, 6.0f, 7.0f, 8.0f), metaRows("2")) + } + + private def assertHudiTypeMetadata(field: StructField, expectedDescriptor: String): Unit = { + assertTrue(field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected field ${field.name} to carry ${HoodieSchema.TYPE_METADATA_FIELD} metadata after read") + assertEquals(expectedDescriptor, field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected ${HoodieSchema.TYPE_METADATA_FIELD}=$expectedDescriptor on field ${field.name}") + } + + private def vectorMetadata(descriptor: String): Metadata = + new MetadataBuilder().putString(HoodieSchema.TYPE_METADATA_FIELD, descriptor).build() + + /** Runs `check` against each Lance base file's Arrow schema. */ + private def validateLanceFileSchema(tablePath: String)(check: (org.apache.arrow.vector.types.pojo.Schema, String) => Unit): Unit = { + val metaClient = HoodieTableMetaClient.builder() + .setConf(HoodieTestUtils.getDefaultStorageConf) + .setBasePath(tablePath) + .build() + val engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf) + val metadataConfig = HoodieMetadataConfig.newBuilder.build + val viewManager = FileSystemViewManager.createViewManager( + engineContext, metadataConfig, FileSystemViewStorageConfig.newBuilder.build, + HoodieCommonConfig.newBuilder.build, + (mc: HoodieTableMetaClient) => metaClient.getTableFormat + .getMetadataFactory.create(engineContext, mc.getStorage, metadataConfig, tablePath)) + val fsView = viewManager.getFileSystemView(metaClient) + try { + val baseFiles = fsView.getLatestBaseFiles("") + .collect(Collectors.toList[org.apache.hudi.common.model.HoodieBaseFile]) + assertTrue(baseFiles.size() > 0, "Expected at least one Lance base file") + val allocator = new RootAllocator() + try { + baseFiles.asScala.foreach { bf => + val reader = LanceFileReader.open(bf.getPath, allocator) + try { + check(reader.schema(), bf.getPath) + } finally { + reader.close() + } + } + } finally { + allocator.close() + } + } finally { + fsView.close() + } + } + + private def assertLanceFooterHasVectorColumns(tablePath: String, expected: String): Unit = { + validateLanceFileSchema(tablePath) { (schema, path) => + val meta = schema.getCustomMetadata + assertNotNull(meta, s"Lance footer metadata null for $path") + val key = HoodieSchema.VECTOR_COLUMNS_METADATA_KEY + assertTrue(meta.containsKey(key), + s"Lance file $path should have footer key $key, got keys ${meta.keySet()}") + assertEquals(expected, meta.get(key), s"Lance file $path footer $key mismatch") + } + } + + private def assertLanceFieldIsFixedSizeList(tablePath: String, fieldName: String, expectedDim: Int): Unit = { + validateLanceFileSchema(tablePath) { (schema, path) => + val field = schema.findField(fieldName) + assertNotNull(field, s"Field $fieldName not found in Lance schema for $path") + field.getType match { + case fsl: ArrowType.FixedSizeList => + assertEquals(expectedDim, fsl.getListSize, + s"Lance field $fieldName in $path should be FixedSizeList of size $expectedDim") + case other => + throw new AssertionError( + s"Lance field $fieldName in $path should be FixedSizeList but was $other") + } + } + } + private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = { spark.createDataFrame(records).toDF("id", "name", "age", "score").coalesce(1) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala index 8ef97c167b9f6..81eb776b54823 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala @@ -770,10 +770,10 @@ class TestVectorDataSource extends HoodieSparkClientTestBase { val reader = ParquetFileReader.open(HadoopInputFile.fromPath(parquetFiles.head.getPath, conf)) try { val footerMeta = reader.getFileMetaData.getKeyValueMetaData.asScala - assertTrue(footerMeta.contains(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY), - s"Footer should contain ${HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY}, got keys: ${footerMeta.keys.mkString(", ")}") + assertTrue(footerMeta.contains(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY), + s"Footer should contain ${HoodieSchema.VECTOR_COLUMNS_METADATA_KEY}, got keys: ${footerMeta.keys.mkString(", ")}") - val value = footerMeta(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY) + val value = footerMeta(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY) assertTrue(value.contains("embedding"), s"Footer value should reference 'embedding' column, got: $value") assertTrue(value.contains("VECTOR"), s"Footer value should contain 'VECTOR' descriptor, got: $value") } finally {