From d989ff3dcc29bbafe951080b75b6a400935b6fb0 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Fri, 27 Mar 2026 15:55:31 +0700 Subject: [PATCH 01/13] feat(lance): Implement columnar batch reading for Lance (COW only) --- .../hudi/io/storage/LanceBatchIterator.java | 166 ++++++ .../lance/SparkLanceReaderBase.scala | 337 ++++++++++-- ...HoodieFileGroupReaderBasedFileFormat.scala | 45 +- .../functional/TestLanceColumnarBatch.scala | 514 ++++++++++++++++++ 4 files changed, 987 insertions(+), 75 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java new file mode 100644 index 0000000000000..41d79c1e324ee --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.sql.vectorized.LanceArrowColumnVector; +import org.lance.file.LanceFileReader; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Iterator that returns {@link ColumnarBatch} directly from Lance files without + * decomposing to individual rows. Used for vectorized/columnar batch reading + * in Spark's COW base-file-only read path. + * + *

Unlike {@link LanceRecordIterator} which extracts rows one by one, + * this iterator preserves the columnar format for zero-copy batch processing. + * + *

Manages the lifecycle of: + *

+ */ +public class LanceBatchIterator implements Iterator, Closeable { + private final BufferAllocator allocator; + private final LanceFileReader lanceReader; + private final ArrowReader arrowReader; + private final String path; + + private ColumnVector[] columnVectors; + private ColumnarBatch currentBatch; + private boolean nextBatchLoaded = false; + private boolean finished = false; + private boolean closed = false; + + /** + * Creates a new Lance batch iterator. + * + * @param allocator Arrow buffer allocator for memory management + * @param lanceReader Lance file reader + * @param arrowReader Arrow reader for batch reading + * @param path File path (for error messages) + */ + public LanceBatchIterator(BufferAllocator allocator, + LanceFileReader lanceReader, + ArrowReader arrowReader, + String path) { + this.allocator = allocator; + this.lanceReader = lanceReader; + this.arrowReader = arrowReader; + this.path = path; + } + + @Override + public boolean hasNext() { + if (finished) { + return false; + } + if (nextBatchLoaded) { + return true; + } + + try { + if (arrowReader.loadNextBatch()) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + + // Create column vector wrappers once and reuse across batches + // (ArrowReader reuses the same VectorSchemaRoot) + if (columnVectors == null) { + columnVectors = root.getFieldVectors().stream() + .map(LanceArrowColumnVector::new) + .toArray(ColumnVector[]::new); + } + + currentBatch = new ColumnarBatch(columnVectors, root.getRowCount()); + nextBatchLoaded = true; + return true; + } + } catch (IOException e) { + throw new HoodieException("Failed to read next batch from Lance file: " + path, e); + } + + finished = true; + return false; + } + + @Override + public ColumnarBatch next() { + if (!hasNext()) { + throw new NoSuchElementException("No more batches available"); + } + nextBatchLoaded = false; + return currentBatch; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + + IOException arrowException = null; + Exception lanceException = null; + + if (currentBatch != null) { + currentBatch.close(); + currentBatch = null; + } + + if (arrowReader != null) { + try { + arrowReader.close(); + } catch (IOException e) { + arrowException = e; + } + } + + if (lanceReader != null) { + try { + lanceReader.close(); + } catch (Exception e) { + lanceException = e; + } + } + + if (allocator != null) { + allocator.close(); + } + + if (arrowException != null) { + throw new HoodieIOException("Failed to close Arrow reader", arrowException); + } + if (lanceException != null) { + throw new HoodieException("Failed to close Lance reader", lanceException); + } + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index 65dec06cd3e00..45166a1ec9d41 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -23,10 +23,10 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig} import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} import org.apache.hudi.common.util -import org.apache.hudi.common.util.collection.ClosableIterator +import org.apache.hudi.common.util.collection.{ClosableIterator, Pair => HoodiePair} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.io.memory.HoodieArrowAllocator -import org.apache.hudi.io.storage.{BlobDescriptorTransform, LanceRecordIterator, VectorConversionUtils} +import org.apache.hudi.io.storage.{BlobDescriptorTransform, HoodieSparkLanceReader, LanceBatchIterator, LanceRecordIterator, VectorConversionUtils} import org.apache.hudi.storage.StorageConfiguration import org.apache.hadoop.conf.Configuration @@ -36,21 +36,24 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumnarFileReader, SparkSchemaTransformUtils} +import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.LanceArrowUtils +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector, LanceArrowColumnVector} import org.lance.file.{BlobReadMode, FileReadOptions, LanceFileReader} -import java.io.IOException +import java.io.{Closeable, IOException} import scala.collection.JavaConverters._ /** * Reader for Lance files in Spark datasource. - * Implements vectorized reading using LanceArrowColumnVector. + * Supports both row-based and columnar batch reading modes. * - * @param enableVectorizedReader whether to use vectorized reading (currently always true for Lance) + * @param enableVectorizedReader when true, returns ColumnarBatch for vectorized processing; + * when false, returns InternalRow one by one */ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumnarFileReader { @@ -139,59 +142,32 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build() val arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE, readOpts) - // Compose the DESCRIPTOR-aware blob transform only when the user opted into that mode - // AND the request actually has BLOB columns (otherwise the rewrite has nothing to do). - val blobFieldNames: java.util.Set[String] = - iteratorSchema.fields.collect { case f if isBlobField(f) => f.name }.toSet.asJava - val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty) { - new BlobDescriptorTransform(blobFieldNames, filePath) + // Decide between batch mode and row mode. + // Fall back to row mode if type casting is needed (batch-level type casting deferred to follow-up). + val hasTypeChanges = !implicitTypeChangeInfo.isEmpty + if (enableVectorizedReader && !hasTypeChanges) { + readBatch(file, allocator, lanceReader, arrowReader, filePath, + requestSchema, requiredSchema, partitionSchema) } else { - null - } - lanceIterator = new LanceRecordIterator( - allocator, lanceReader, arrowReader, iteratorSchema, filePath, blobTransform) - - // Register cleanup listener - Option(TaskContext.get()).foreach { ctx => - ctx.addTaskCompletionListener[Unit](_ => lanceIterator.close()) - } + // Compose the DESCRIPTOR-aware blob transform only when the user opted into that mode + // AND the request actually has BLOB columns (otherwise the rewrite has nothing to do). + val blobFieldNames: java.util.Set[String] = + iteratorSchema.fields.collect { case f if isBlobField(f) => f.name }.toSet.asJava + val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty) { + new BlobDescriptorTransform(blobFieldNames, filePath) + } else { + null + } + val recordIterator = new LanceRecordIterator( + allocator, lanceReader, arrowReader, iteratorSchema, filePath, blobTransform) + lanceIterator = recordIterator - val baseIter: Iterator[InternalRow] = lanceIterator.asScala - - // Create the following projections for schema evolution: - // 1. Padding projection: add NULL for missing columns - // 2. Casting projection: handle type conversions - val schemaUtils = sparkAdapter.getSchemaUtils - val paddingProj = SparkSchemaTransformUtils.generateNullPaddingProjection(iteratorSchema, requiredSchema) - val castProj = SparkSchemaTransformUtils.generateUnsafeProjection( - schemaUtils.toAttributes(requiredSchema), - Some(SQLConf.get.sessionLocalTimeZone), - implicitTypeChangeInfo, - requiredSchema, - new StructType(), - schemaUtils - ) - - // Unify projections by applying padding and then casting for each row - val projection: UnsafeProjection = new UnsafeProjection { - def apply(row: InternalRow): UnsafeRow = - castProj(paddingProj(row)) - } - val projectedIter = baseIter.map(projection.apply) + // Register cleanup listener + Option(TaskContext.get()).foreach { ctx => + ctx.addTaskCompletionListener[Unit](_ => recordIterator.close()) + } - // Handle partition columns - if (partitionSchema.length == 0) { - // No partition columns - return rows directly - projectedIter - } else { - // Create UnsafeProjection to convert JoinedRow to UnsafeRow - val fullSchema = (requiredSchema.fields ++ partitionSchema.fields).map(f => - AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - // Append partition values to each row using JoinedRow, then convert to UnsafeRow - val joinedRow = new JoinedRow() - projectedIter.map(row => unsafeProjection(joinedRow(row, file.partitionValues))) + readRows(file, recordIterator, iteratorSchema, requiredSchema, partitionSchema, implicitTypeChangeInfo) } } catch { @@ -262,4 +238,253 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna valueContainsNull = true) case other => other } + + /** + * Columnar batch reading path. Returns Iterator[ColumnarBatch] type-erased as Iterator[InternalRow]. + * Used when enableVectorizedReader=true and no type casting is needed. + */ + private def readBatch(file: PartitionedFile, + allocator: org.apache.arrow.memory.BufferAllocator, + lanceReader: LanceFileReader, + arrowReader: org.apache.arrow.vector.ipc.ArrowReader, + filePath: String, + requestSchema: StructType, + requiredSchema: StructType, + partitionSchema: StructType): Iterator[InternalRow] = { + + val batchIterator = new LanceBatchIterator(allocator, lanceReader, arrowReader, filePath) + + // Build column mapping: for each column in requiredSchema, find its index in requestSchema (file columns) + // Returns -1 if the column is missing from the file (schema evolution: column addition) + val columnMapping: Array[Int] = requiredSchema.fields.map { field => + requestSchema.fieldNames.indexOf(field.name) + } + + // Create Arrow-backed null vectors for columns missing from the file. + // Uses LanceArrowColumnVector so that Spark's vectorTypes() contract is satisfied + // (FileSourceScanExec expects all data columns to be LanceArrowColumnVector). + val nullAllocator = if (columnMapping.contains(-1)) { + HoodieArrowAllocator.newChildAllocator( + getClass.getSimpleName + "-null-" + filePath, HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE) + } else null + + val nullColumnVectors: Array[(Int, LanceArrowColumnVector, org.apache.arrow.vector.FieldVector)] = + if (nullAllocator != null) { + columnMapping.zipWithIndex.filter(_._1 < 0).map { case (_, idx) => + val field = LanceArrowUtils.toArrowField( + requiredSchema(idx).name, requiredSchema(idx).dataType, requiredSchema(idx).nullable, "UTC") + val arrowVector = field.createVector(nullAllocator) + arrowVector.allocateNew() + arrowVector.setValueCount(DEFAULT_BATCH_SIZE) + (idx, new LanceArrowColumnVector(arrowVector), arrowVector) + } + } else { + Array.empty + } + + // Pre-create partition column vectors (reused across batches, reset per batch) + val hasPartitionColumns = partitionSchema.length > 0 + val partitionVectors: Array[WritableColumnVector] = if (hasPartitionColumns) { + partitionSchema.fields.map(f => new OnHeapColumnVector(DEFAULT_BATCH_SIZE, f.dataType)) + } else { + Array.empty + } + + // Populate partition vectors with constant values + var lastPopulatedNumRows = DEFAULT_BATCH_SIZE + if (hasPartitionColumns) { + populatePartitionVectors(partitionVectors, partitionSchema, file.partitionValues, DEFAULT_BATCH_SIZE) + } + + val totalColumns = requiredSchema.length + partitionSchema.length + + // Map each source batch to a batch with the correct column layout. + val mappedIterator = new Iterator[ColumnarBatch] with Closeable { + override def hasNext: Boolean = batchIterator.hasNext() + + override def next(): ColumnarBatch = { + val sourceBatch = batchIterator.next() + val numRows = sourceBatch.numRows() + + val vectors = new Array[ColumnVector](totalColumns) + + // Data columns: reorder from source batch or substitute null Arrow vector + var i = 0 + while (i < requiredSchema.length) { + if (columnMapping(i) >= 0) { + vectors(i) = sourceBatch.column(columnMapping(i)) + } else { + // Find the pre-created null vector for this index + val entry = nullColumnVectors.find(_._1 == i).get + // Adjust valueCount if batch size differs from allocated size + if (numRows != entry._3.getValueCount) { + entry._3.setValueCount(numRows) + } + vectors(i) = entry._2 + } + i += 1 + } + + // Partition columns: constant vectors + if (hasPartitionColumns) { + if (numRows != lastPopulatedNumRows) { + populatePartitionVectors(partitionVectors, partitionSchema, file.partitionValues, numRows) + lastPopulatedNumRows = numRows + } + var j = 0 + while (j < partitionSchema.length) { + vectors(requiredSchema.length + j) = partitionVectors(j) + j += 1 + } + } + + val result = new ColumnarBatch(vectors) + result.setNumRows(numRows) + result + } + + override def close(): Unit = { + // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) + nullColumnVectors.foreach { case (_, columnVector, arrowVector) => + columnVector.close() + arrowVector.close() + } + if (nullAllocator != null) nullAllocator.close() + batchIterator.close() + partitionVectors.foreach(_.close()) + } + } + + // Register cleanup listener + Option(TaskContext.get()).foreach { ctx => + ctx.addTaskCompletionListener[Unit](_ => mappedIterator.close()) + } + + mappedIterator.asInstanceOf[Iterator[InternalRow]] + } + + /** + * Row-based reading path. Consumes the pre-built [[LanceRecordIterator]] (already wired up + * with blob transform if applicable) and applies schema-evolution projections + partition handling. + */ + private def readRows(file: PartitionedFile, + recordIterator: LanceRecordIterator, + iteratorSchema: StructType, + requiredSchema: StructType, + partitionSchema: StructType, + implicitTypeChangeInfo: java.util.Map[Integer, HoodiePair[DataType, DataType]]): Iterator[InternalRow] = { + + val baseIter: Iterator[InternalRow] = recordIterator.asScala + + // Create the following projections for schema evolution: + // 1. Padding projection: add NULL for missing columns + // 2. Casting projection: handle type conversions + val schemaUtils = sparkAdapter.getSchemaUtils + val paddingProj = SparkSchemaTransformUtils.generateNullPaddingProjection(iteratorSchema, requiredSchema) + val castProj = SparkSchemaTransformUtils.generateUnsafeProjection( + schemaUtils.toAttributes(requiredSchema), + Some(SQLConf.get.sessionLocalTimeZone), + implicitTypeChangeInfo, + requiredSchema, + new StructType(), + schemaUtils + ) + + // Unify projections by applying padding and then casting for each row + val projection: UnsafeProjection = new UnsafeProjection { + def apply(row: InternalRow): UnsafeRow = + castProj(paddingProj(row)) + } + val projectedIter = baseIter.map(projection.apply) + + // Handle partition columns + if (partitionSchema.length == 0) { + // No partition columns - return rows directly + projectedIter + } else { + // Create UnsafeProjection to convert JoinedRow to UnsafeRow + val fullSchema = (requiredSchema.fields ++ partitionSchema.fields).map(f => + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + // Append partition values to each row using JoinedRow, then convert to UnsafeRow + val joinedRow = new JoinedRow() + projectedIter.map(row => unsafeProjection(joinedRow(row, file.partitionValues))) + } + } + + /** + * Populate writable column vectors with constant partition values. + * Each vector is filled with the same value for all rows. + */ + private def populatePartitionVectors(vectors: Array[WritableColumnVector], + partitionSchema: StructType, + partitionValues: InternalRow, + numRows: Int): Unit = { + var i = 0 + while (i < partitionSchema.length) { + val vector = vectors(i) + vector.reset() + if (partitionValues.isNullAt(i)) { + vector.putNulls(0, numRows) + } else { + partitionSchema(i).dataType match { + case BooleanType => + val v = partitionValues.getBoolean(i) + var j = 0 + while (j < numRows) { vector.putBoolean(j, v); j += 1 } + case ByteType => + val v = partitionValues.getByte(i) + var j = 0 + while (j < numRows) { vector.putByte(j, v); j += 1 } + case ShortType => + val v = partitionValues.getShort(i) + var j = 0 + while (j < numRows) { vector.putShort(j, v); j += 1 } + case IntegerType | DateType => + val v = partitionValues.getInt(i) + vector.putInts(0, numRows, v) + case LongType | TimestampType => + val v = partitionValues.getLong(i) + vector.putLongs(0, numRows, v) + case FloatType => + val v = partitionValues.getFloat(i) + var j = 0 + while (j < numRows) { vector.putFloat(j, v); j += 1 } + case DoubleType => + val v = partitionValues.getDouble(i) + var j = 0 + while (j < numRows) { vector.putDouble(j, v); j += 1 } + case StringType => + val v = partitionValues.getUTF8String(i) + val bytes = v.getBytes + var j = 0 + while (j < numRows) { vector.putByteArray(j, bytes); j += 1 } + case d: DecimalType => + val v = partitionValues.getDecimal(i, d.precision, d.scale) + if (d.precision <= Decimal.MAX_INT_DIGITS) { + val unscaled = v.toUnscaledLong.toInt + var j = 0 + while (j < numRows) { vector.putInt(j, unscaled); j += 1 } + } else if (d.precision <= Decimal.MAX_LONG_DIGITS) { + val unscaled = v.toUnscaledLong + var j = 0 + while (j < numRows) { vector.putLong(j, unscaled); j += 1 } + } else { + val bytes = v.toJavaBigDecimal.unscaledValue().toByteArray + var j = 0 + while (j < numRows) { vector.putByteArray(j, bytes); j += 1 } + } + case BinaryType => + val v = partitionValues.getBinary(i) + var j = 0 + while (j < numRows) { vector.putByteArray(j, v); j += 1 } + case _ => + // For unsupported types, fill with nulls + vector.putNulls(0, numRows) + } + } + i += 1 + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 3da22ff8ebe7e..8523a8543dab9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -157,8 +157,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val orcBatchSupported = conf.orcVectorizedReaderEnabled && schema.forall(s => OrcUtils.supportColumnarReads( s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled)) - // TODO: Implement columnar batch reading https://github.com/apache/hudi/issues/17736 - val lanceBatchSupported = false + val lanceBatchSupported = true val supportBatch = if (isMultipleBaseFileFormatsEnabled) { parquetBatchSupported && orcBatchSupported @@ -184,23 +183,31 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, override def vectorTypes(requiredSchema: StructType, partitionSchema: StructType, sqlConf: SQLConf): Option[Seq[String]] = { - val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf) - if (mandatoryFields.isEmpty) { - originalVectorTypes + if (hoodieFileFormat == HoodieFileFormat.LANCE && !isMultipleBaseFileFormatsEnabled) { + // Lance uses LanceArrowColumnVector for data columns and OnHeapColumnVector for partition columns. + // Spark uses vectorTypes to determine if columnar batch reading is supported. + val lanceVectorType = "org.apache.spark.sql.vectorized.LanceArrowColumnVector" + val partitionVectorType = classOf[OnHeapColumnVector].getName + Some(Seq.fill(requiredSchema.length)(lanceVectorType) ++ Seq.fill(partitionSchema.length)(partitionVectorType)) } else { - val regularVectorType = if (!sqlConf.offHeapColumnVectorEnabled) { - classOf[OnHeapColumnVector].getName + val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf) + if (mandatoryFields.isEmpty) { + originalVectorTypes } else { - classOf[OffHeapColumnVector].getName - } - originalVectorTypes.map { - o: Seq[String] => o.zipWithIndex.map(a => { - if (a._2 >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(a._2 - requiredSchema.length).name)) { - regularVectorType - } else { - a._1 - } - }) + val regularVectorType = if (!sqlConf.offHeapColumnVectorEnabled) { + classOf[OnHeapColumnVector].getName + } else { + classOf[OffHeapColumnVector].getName + } + originalVectorTypes.map { + o: Seq[String] => o.zipWithIndex.map(a => { + if (a._2 >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(a._2 - requiredSchema.length).name)) { + regularVectorType + } else { + a._1 + } + }) + } } } } @@ -319,7 +326,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, fixedPartitionIndexes) case _ => - readBaseFile(file, baseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, + readBaseFile(file, fileGroupBaseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) } // CDC queries. @@ -327,7 +334,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, fileGroupBaseFileReader.value, storageConf, fileIndexProps, requiredSchema, metaClient) case _ => - readBaseFile(file, baseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, + readBaseFile(file, fileGroupBaseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) } CloseableIteratorListener.addListener(iter) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala new file mode 100644 index 0000000000000..66285e7abe1c8 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.DefaultSparkRecordMerger +import org.apache.hudi.SparkAdapterSupport.sparkAdapter +import org.apache.hudi.client.SparkTaskContextSupplier +import org.apache.hudi.common.bloom.{BloomFilterFactory, BloomFilterTypeCode} +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.testutils.HoodieTestUtils +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.io.storage.HoodieSparkLanceWriter +import org.apache.hudi.storage.StoragePath +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration +import org.apache.hudi.testutils.HoodieSparkClientTestBase + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.unsafe.types.UTF8String +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.condition.DisabledIfSystemProperty + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +/** + * Tests for Lance columnar batch (vectorized) reading, covering: + * + * Unit-level tests (invoke [[SparkLanceReaderBase]] directly): + * - Row-based path: `enableVectorizedReader=false` must yield [[InternalRow]], never [[ColumnarBatch]] + * - Columnar path: `enableVectorizedReader=true` with matching schemas yields [[ColumnarBatch]] + * - Null-padding: columns absent from the file are null-padded in the [[ColumnarBatch]] + * - Partition vectors: partition values are appended as constant columns to each [[ColumnarBatch]] + * - Fallback: implicit type change (e.g. FLOAT → DOUBLE) forces the row path + * + * Integration tests (Spark SQL / DataFrame API): + * - COW table round-trip via `spark.read.format("hudi")` with vectorized reads active + * - Schema evolution: adding a column via bulk_insert; old base files are null-padded + * - SQL query: `SELECT` with a `WHERE` clause on a COW table + */ +@DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true") +class TestLanceColumnarBatch extends HoodieSparkClientTestBase { + + var spark: SparkSession = _ + + @BeforeEach + override def setUp(): Unit = { + super.setUp() + spark = sqlContext.sparkSession + } + + @AfterEach + override def tearDown(): Unit = { + super.tearDown() + spark = null + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + /** + * Write a plain Lance file (no Hudi metadata fields) containing [[rows]] with [[schema]]. + * Returns the absolute path string of the written file. + */ + private def writeLanceFile(fileName: String, schema: StructType, rows: Seq[InternalRow]): String = { + val bloom = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, BloomFilterTypeCode.SIMPLE.name()) + val path = new StoragePath(s"$basePath/$fileName") + val storage = HoodieTestUtils.getStorage(basePath) + try { + val writer = new HoodieSparkLanceWriter( + path, schema, "20240101120000000", new SparkTaskContextSupplier(), + storage, false, HOption.of(bloom)) + try { + rows.zipWithIndex.foreach { case (row, i) => writer.writeRow("key" + i, row) } + } finally { + writer.close() + } + } finally { + storage.close() + } + path.toString + } + + private def extractRow(row: InternalRow, schema: StructType): (Int, String, Any) = { + val id = row.getInt(0) + val name = if (row.isNullAt(1)) null else row.getUTF8String(1).toString + val col2 = if (schema.fields.length >= 3) { + val f = schema.fields(2) + if (row.isNullAt(2)) null + else f.dataType match { + case DoubleType => row.getDouble(2).asInstanceOf[AnyRef] + case FloatType => row.getFloat(2).asInstanceOf[AnyRef] + case IntegerType => row.getInt(2).asInstanceOf[AnyRef] + case LongType => row.getLong(2).asInstanceOf[AnyRef] + case StringType => row.getUTF8String(2).toString + case _ => row.get(2, f.dataType) + } + } else null + (id, name, col2) + } + + /** + * Invoke [[SparkLanceReaderBase.read]] and consume the iterator, safely extracting data from + * each item before the iterator advances (which would invalidate Arrow-backed batch buffers). + * + * Returns a tuple of: + * - `batchCount` – number of [[ColumnarBatch]] items returned + * - `rowCount` – number of [[InternalRow]] items returned (non-batch path) + * - `rows` – collected (id: Int, name: String, col2: Any) tuples from both paths + */ + private def readAndCollect( + filePath: String, + requiredSchema: StructType, + partitionSchema: StructType = new StructType(), + partitionValues: InternalRow = new GenericInternalRow(Array.empty[Any]), + enableVectorized: Boolean = true + ): (Int, Int, Seq[(Int, String, Any)]) = { + + val pf = sparkAdapter.getSparkPartitionedFileUtils + .createPartitionedFile(partitionValues, new StoragePath(filePath), 0L, Long.MaxValue) + val storageConf = new HadoopStorageConfiguration(new Configuration()) + val reader = new SparkLanceReaderBase(enableVectorized) + + var batchCount = 0 + var rowCount = 0 + val rows = ArrayBuffer.empty[(Int, String, Any)] + + // Cast to Iterator[Any] to avoid JVM checkcast to InternalRow. + // The columnar batch path returns ColumnarBatch via type erasure (Iterator[InternalRow] + // that actually contains ColumnarBatch). Both iter.foreach and iter.next() on a typed + // Iterator[InternalRow] insert a checkcast that fails for ColumnarBatch. + val iter = reader.read(pf, requiredSchema, partitionSchema, HOption.empty(), Seq.empty[Filter], storageConf) + .asInstanceOf[Iterator[Any]] + + while (iter.hasNext) { + iter.next() match { + case batch: ColumnarBatch => + batchCount += 1 + batch.rowIterator().asScala.foreach { row => + rows += extractRow(row, requiredSchema) + } + + case row: InternalRow => + rowCount += 1 + rows += extractRow(row, requiredSchema) + } + } + + (batchCount, rowCount, rows.toSeq) + } + + private val baseSchema: StructType = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType, nullable = true) + .add("score", DoubleType, nullable = true) + + private def baseRows: Seq[InternalRow] = Seq( + new GenericInternalRow(Array[Any](1, UTF8String.fromString("Alice"), 95.5d)), + new GenericInternalRow(Array[Any](2, UTF8String.fromString("Bob"), 87.3d)), + new GenericInternalRow(Array[Any](3, UTF8String.fromString("Charlie"), 92.1d)) + ) + + private def writeHudiTable(tableType: HoodieTableType, + tableName: String, + tablePath: String, + df: DataFrame, + saveMode: SaveMode = SaveMode.Append, + operation: Option[String] = None, + extraOptions: Map[String, String] = Map.empty): Unit = { + var writer = df.write + .format("hudi") + .option("hoodie.table.base.file.format", "LANCE") + .option(TABLE_TYPE.key(), tableType.name()) + .option(RECORDKEY_FIELD.key(), "id") + .option(PRECOMBINE_FIELD.key(), "id") + .option(TABLE_NAME.key(), tableName) + .option(HoodieWriteConfig.TBL_NAME.key(), tableName) + .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) + operation.foreach(op => writer = writer.option(OPERATION.key(), op)) + extraOptions.foreach { case (k, v) => writer = writer.option(k, v) } + writer.mode(saveMode).save(tablePath) + } + + // =========================================================================== + // Unit tests: SparkLanceReaderBase + // =========================================================================== + + /** + * Row path (`enableVectorizedReader=false`) must return [[InternalRow]] instances, never + * [[ColumnarBatch]]. Data values must survive the row-level UnsafeProjection round-trip. + */ + @Test + def testRowPathReturnsInternalRows(): Unit = { + val filePath = writeLanceFile("row_path.lance", baseSchema, baseRows) + val (batches, rowItems, rows) = readAndCollect(filePath, baseSchema, enableVectorized = false) + + assertEquals(0, batches, "Row path must not yield ColumnarBatch") + assertEquals(3, rowItems, "Row path must yield one InternalRow per record") + + assertEquals(List(1, 2, 3), rows.map(_._1)) + assertEquals(List("Alice", "Bob", "Charlie"), rows.map(_._2)) + assertEquals(List(95.5d, 87.3d, 92.1d), rows.map(_._3)) + } + + /** + * Columnar path (`enableVectorizedReader=true`, no type changes) must return + * [[ColumnarBatch]] instances backed by [[org.apache.spark.sql.vectorized.LanceArrowColumnVector]]. + * No [[InternalRow]] may be returned directly. Data must round-trip correctly. + */ + @Test + def testColumnarPathReturnsBatches(): Unit = { + val filePath = writeLanceFile("columnar_path.lance", baseSchema, baseRows) + val (batches, rowItems, rows) = readAndCollect(filePath, baseSchema, enableVectorized = true) + + assertTrue(batches > 0, "Vectorized path must yield at least one ColumnarBatch") + assertEquals(0, rowItems, "Vectorized path must not yield raw InternalRow") + + assertEquals(3, rows.size) + assertEquals(List(1, 2, 3), rows.map(_._1)) + assertEquals(List("Alice", "Bob", "Charlie"), rows.map(_._2)) + assertEquals(List(95.5d, 87.3d, 92.1d), rows.map(_._3)) + } + + /** + * When the required schema contains a column that is absent from the file (schema evolution: + * column addition), the vectorized path must null-pad that column in each [[ColumnarBatch]] + * using a [[org.apache.spark.sql.vectorized.LanceArrowColumnVector]] backed by an all-null + * Arrow vector. + */ + @Test + def testColumnarPathNullPadsAbsentColumns(): Unit = { + val fileSchema = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType, nullable = true) + val rows = Seq( + new GenericInternalRow(Array[Any](1, UTF8String.fromString("Alice"))), + new GenericInternalRow(Array[Any](2, UTF8String.fromString("Bob"))) + ) + val filePath = writeLanceFile("null_pad.lance", fileSchema, rows) + + // requiredSchema adds 'score' which does not exist in the file + val requiredSchemaWithExtra = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType, nullable = true) + .add("score", DoubleType, nullable = true) + + val pf = sparkAdapter.getSparkPartitionedFileUtils + .createPartitionedFile(new GenericInternalRow(Array.empty[Any]), + new StoragePath(filePath), 0L, Long.MaxValue) + val storageConf = new HadoopStorageConfiguration(new Configuration()) + val reader = new SparkLanceReaderBase(enableVectorizedReader = true) + + var batchesSeen = 0 + val nullPadded = ArrayBuffer.empty[Boolean] + + val iter = reader.read(pf, requiredSchemaWithExtra, new StructType(), HOption.empty(), + Seq.empty[Filter], storageConf).asInstanceOf[Iterator[Any]] + while (iter.hasNext) { + iter.next() match { + case batch: ColumnarBatch => + batchesSeen += 1 + batch.rowIterator().asScala.foreach { row => + nullPadded += row.isNullAt(2) + } + case _ => + fail("Vectorized path with null-padding must return ColumnarBatch") + } + } + + assertTrue(batchesSeen > 0, "Must have produced at least one ColumnarBatch") + assertEquals(2, nullPadded.size, "Must have read 2 rows") + assertTrue(nullPadded.forall(identity), "Absent column 'score' must be null in every row") + } + + /** + * When a non-empty `partitionSchema` is supplied, the vectorized path must append constant + * partition-value columns to each [[ColumnarBatch]] using + * [[org.apache.spark.sql.execution.vectorized.OnHeapColumnVector]]. + * + * The resulting rows must have `dataSchema.size + partitionSchema.size` fields, with the + * partition value in the last position. + */ + @Test + def testColumnarPathAppendsPartitionVectors(): Unit = { + val filePath = writeLanceFile("partitioned.lance", baseSchema, baseRows) + + val partitionSchema = new StructType().add("dept", StringType, nullable = true) + val partitionValues = new GenericInternalRow( + Array[Any](UTF8String.fromString("engineering"))) + + val pf = sparkAdapter.getSparkPartitionedFileUtils + .createPartitionedFile(partitionValues, new StoragePath(filePath), 0L, Long.MaxValue) + val storageConf = new HadoopStorageConfiguration(new Configuration()) + val reader = new SparkLanceReaderBase(enableVectorizedReader = true) + + var batchesSeen = 0 + val depts = ArrayBuffer.empty[String] + + val iter = reader.read(pf, baseSchema, partitionSchema, HOption.empty(), + Seq.empty[Filter], storageConf).asInstanceOf[Iterator[Any]] + while (iter.hasNext) { + iter.next() match { + case batch: ColumnarBatch => + batchesSeen += 1 + assertEquals(baseSchema.size + partitionSchema.size, batch.numCols(), + "Batch column count must equal data + partition columns") + batch.rowIterator().asScala.foreach { row => + depts += row.getUTF8String(baseSchema.size).toString + } + case _ => + fail("Vectorized path must return ColumnarBatch when partitionSchema is present") + } + } + + assertTrue(batchesSeen > 0) + assertEquals(3, depts.size) + assertTrue(depts.forall(_ == "engineering"), + "Every row must carry the constant partition value 'engineering'") + } + + /** + * When [[SparkSchemaTransformUtils.buildImplicitSchemaChangeInfo]] detects an implicit type + * change (file has FLOAT, query requires DOUBLE), the columnar path must fall back to the + * row path. The row path applies a cast projection so the returned values must be correct + * DOUBLE values even though the file stores FLOAT. + */ + @Test + def testTypeChangeFallsBackToRowPath(): Unit = { + val fileSchema = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType, nullable = true) + .add("score", FloatType, nullable = true) + + val fileRows = Seq( + new GenericInternalRow(Array[Any](1, UTF8String.fromString("Alice"), 95.5f)), + new GenericInternalRow(Array[Any](2, UTF8String.fromString("Bob"), 87.3f)) + ) + val filePath = writeLanceFile("type_change.lance", fileSchema, fileRows) + + // requiredSchema uses DOUBLE for 'score' — implicit type change: FLOAT → DOUBLE + val requiredSchemaDoubleScore = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType, nullable = true) + .add("score", DoubleType, nullable = true) + + val (batches, rowItems, rows) = readAndCollect(filePath, requiredSchemaDoubleScore, enableVectorized = true) + + assertEquals(0, batches, + "Implicit type change FLOAT→DOUBLE must disable the columnar path and return InternalRow") + assertEquals(2, rowItems) + + assertEquals(List(1, 2), rows.map(_._1)) + assertEquals(List("Alice", "Bob"), rows.map(_._2)) + // FLOAT 95.5f → cast via String → DOUBLE: result must be close to 95.5 + assertEquals(95.5d, rows(0)._3.asInstanceOf[Double], 0.01d) + assertEquals(87.3d, rows(1)._3.asInstanceOf[Double], 0.01d) + } + + // =========================================================================== + // Integration tests: Spark SQL / DataFrame API (COW only) + // =========================================================================== + + /** + * End-to-end test for a COW Lance table read via `spark.read.format("hudi")`. + * + * With `lanceBatchSupported = true`, Spark will enable vectorized reads for Lance COW tables. + * This test verifies that data written via the Hudi DataFrame API is read back correctly when + * the vectorized code path is active. + */ + @Test + def testCOWTableDataFrameRead(): Unit = { + val tableName = "test_lance_columnar_cow" + val tablePath = s"$basePath/$tableName" + + val df = spark.createDataFrame(Seq( + (1, "Alice", 95.5d), + (2, "Bob", 87.3d), + (3, "Charlie", 92.1d) + )).toDF("id", "name", "score") + + writeHudiTable(HoodieTableType.COPY_ON_WRITE, tableName, tablePath, df, + saveMode = SaveMode.Overwrite, operation = Some("bulk_insert")) + + val actual = spark.read.format("hudi").load(tablePath) + .select("id", "name", "score") + .orderBy("id") + + val expected = Seq( + (1, "Alice", 95.5d), + (2, "Bob", 87.3d), + (3, "Charlie", 92.1d) + ) + + val actualRows = actual.collect() + assertEquals(3, actualRows.length) + expected.zip(actualRows).foreach { case ((eid, ename, escore), row) => + assertEquals(eid, row.getAs[Int]("id")) + assertEquals(ename, row.getAs[String]("name")) + assertEquals(escore, row.getAs[Double]("score"), 1e-6) + } + } + + /** + * Schema evolution: a second bulk_insert writes records with a wider schema that adds a + * 'department' column. Old base files (from the first insert) do not contain 'department'. + * + * When reading the whole table with the full schema, `SparkLanceReaderBase` must null-pad + * 'department' for every row from the old base file, while the new base file has actual values. + */ + @Test + def testCOWTableSchemaEvolutionNullPadding(): Unit = { + val tableName = "test_lance_schema_evolution" + val tablePath = s"$basePath/$tableName" + + // First batch: narrow schema (id, name, score) + val df1 = spark.createDataFrame(Seq( + (1, "Alice", 95.5d), + (2, "Bob", 87.3d) + )).toDF("id", "name", "score") + + writeHudiTable(HoodieTableType.COPY_ON_WRITE, tableName, tablePath, df1, + saveMode = SaveMode.Overwrite, operation = Some("bulk_insert")) + + // Second batch: wider schema adds 'department' + val df2 = spark.createDataFrame(Seq( + (3, "Charlie", 92.1d, "engineering"), + (4, "David", 88.0d, "sales") + )).toDF("id", "name", "score", "department") + + writeHudiTable(HoodieTableType.COPY_ON_WRITE, tableName, tablePath, df2, + operation = Some("bulk_insert"), + extraOptions = Map(RECONCILE_SCHEMA.key() -> "true")) + + // Read back with full schema — 'department' must be null for records from the first batch + val readDf = spark.read.format("hudi").load(tablePath) + val actual = readDf.select("id", "name", "score", "department").orderBy("id") + val rows = actual.collect() + + assertEquals(4, rows.length) + + // Records 1 and 2 came from the narrow-schema file → 'department' must be null + assertEquals(1, rows(0).getAs[Int]("id")) + assertTrue(rows(0).isNullAt(rows(0).fieldIndex("department")), + "Records from the narrow-schema file must have null 'department'") + assertEquals(2, rows(1).getAs[Int]("id")) + assertTrue(rows(1).isNullAt(rows(1).fieldIndex("department")), + "Records from the narrow-schema file must have null 'department'") + + // Records 3 and 4 came from the wider-schema file → 'department' must be present + assertEquals(3, rows(2).getAs[Int]("id")) + assertEquals("engineering", rows(2).getAs[String]("department")) + assertEquals(4, rows(3).getAs[Int]("id")) + assertEquals("sales", rows(3).getAs[String]("department")) + } + + /** + * Spark SQL `SELECT … WHERE` query on a COW Lance table. Verifies that predicate evaluation + * works correctly when vectorized reading is active (Spark evaluates predicates on + * [[ColumnarBatch]] rows returned from the vectorized scan). + */ + @Test + def testCOWTableSparkSqlQuery(): Unit = { + val tableName = "test_lance_sql_columnar" + val tablePath = s"$basePath/$tableName" + + val df = spark.createDataFrame(Seq( + (1, "Alice", 30), + (2, "Bob", 25), + (3, "Charlie", 35), + (4, "David", 28), + (5, "Eve", 32) + )).toDF("id", "name", "age") + + writeHudiTable(HoodieTableType.COPY_ON_WRITE, tableName, tablePath, df, + saveMode = SaveMode.Overwrite, operation = Some("bulk_insert")) + + spark.read.format("hudi").load(tablePath).createOrReplaceTempView(tableName) + + val result = spark.sql( + s"SELECT id, name, age FROM $tableName WHERE age > 30 ORDER BY id" + ).collect() + + assertEquals(2, result.length, "Only Charlie (35) and Eve (32) satisfy age > 30") + assertEquals(3, result(0).getAs[Int]("id")) + assertEquals("Charlie", result(0).getAs[String]("name")) + assertEquals(5, result(1).getAs[Int]("id")) + assertEquals("Eve", result(1).getAs[String]("name")) + } +} From c05aedf88c31ef11bf22128a05607b328cddbf7a Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Fri, 27 Mar 2026 22:40:03 +0700 Subject: [PATCH 02/13] fix for TestAvroSchemaResolutionSupport --- .../HoodieFileGroupReaderBasedFileFormat.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 8523a8543dab9..beadf9d84dce8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -272,6 +272,15 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } else { baseFileReader } + // For base-file-only reads (case _ branches), we use the vectorized reader for formats that support + // returning InternalRow even in vectorized mode (Parquet with returningBatch=false). + // Lance's vectorized reader always returns ColumnarBatch, so for MOR Lance tables (where + // supportReturningBatch=false and Spark expects InternalRow), we must use the non-vectorized reader. + val baseFileOnlyReader = if (isMOR && hoodieFileFormat == HoodieFileFormat.LANCE && !isMultipleBaseFileFormatsEnabled) { + fileGroupBaseFileReader + } else { + baseFileReader + } val broadcastedStorageConf = spark.sparkContext.broadcast(new SerializableConfiguration(augmentedStorageConf.unwrap())) val fileIndexProps: TypedProperties = HoodieFileIndex.getConfigProperties(spark, options, null) @@ -326,7 +335,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, fixedPartitionIndexes) case _ => - readBaseFile(file, fileGroupBaseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, + readBaseFile(file, baseFileOnlyReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) } // CDC queries. @@ -334,7 +343,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, fileGroupBaseFileReader.value, storageConf, fileIndexProps, requiredSchema, metaClient) case _ => - readBaseFile(file, fileGroupBaseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, + readBaseFile(file, baseFileOnlyReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) } CloseableIteratorListener.addListener(iter) From e22613eb57532d5ab832bd0187775e910407f393 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Fri, 3 Apr 2026 16:32:22 +0700 Subject: [PATCH 03/13] fixed by review notes --- .../apache/hudi/io/storage/LanceBatchIterator.java | 9 +++++---- .../hudi/io/storage/LanceRecordIterator.java | 8 ++++---- .../datasources/lance/SparkLanceReaderBase.scala | 6 +++--- .../hudi/functional/TestLanceColumnarBatch.scala | 14 +++++++------- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java index 41d79c1e324ee..8609392caee5b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java @@ -131,10 +131,11 @@ public void close() { IOException arrowException = null; Exception lanceException = null; - if (currentBatch != null) { - currentBatch.close(); - currentBatch = null; - } + // Don't close currentBatch here: ColumnarBatch.close() would close the + // underlying Arrow FieldVectors through LanceArrowColumnVector, but they + // are owned by the ArrowReader (via VectorSchemaRoot) and will be closed + // when arrowReader.close() is called below. + currentBatch = null; if (arrowReader != null) { try { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java index 91a0421d01186..98ba44507dba6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java @@ -115,10 +115,10 @@ public boolean hasNext() { return true; } - if (currentBatch != null) { - currentBatch.close(); - currentBatch = null; - } + // Release reference to previous batch (don't close — the underlying + // FieldVectors are reused by ArrowReader across loadNextBatch() calls, + // and closing would corrupt the cached LanceArrowColumnVector wrappers). + currentBatch = null; // Try to load next batch. Loop so zero-row batches (legitimately returned e.g. after // filter pushdown) don't silently terminate iteration and drop subsequent non-empty batches. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index 45166a1ec9d41..58d5e604d6c99 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -29,6 +29,7 @@ import org.apache.hudi.io.memory.HoodieArrowAllocator import org.apache.hudi.io.storage.{BlobDescriptorTransform, HoodieSparkLanceReader, LanceBatchIterator, LanceRecordIterator, VectorConversionUtils} import org.apache.hudi.storage.StorageConfiguration +import org.apache.arrow.vector.ipc.ArrowReader import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema.MessageType import org.apache.spark.TaskContext @@ -246,7 +247,7 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna private def readBatch(file: PartitionedFile, allocator: org.apache.arrow.memory.BufferAllocator, lanceReader: LanceFileReader, - arrowReader: org.apache.arrow.vector.ipc.ArrowReader, + arrowReader: ArrowReader, filePath: String, requestSchema: StructType, requiredSchema: StructType, @@ -345,9 +346,8 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna override def close(): Unit = { // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) - nullColumnVectors.foreach { case (_, columnVector, arrowVector) => + nullColumnVectors.foreach { case (_, columnVector, _) => columnVector.close() - arrowVector.close() } if (nullAllocator != null) nullAllocator.close() batchIterator.close() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala index 66285e7abe1c8..ed190bed5d127 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala @@ -79,10 +79,6 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { spark = null } - // --------------------------------------------------------------------------- - // Helpers - // --------------------------------------------------------------------------- - /** * Write a plain Lance file (no Hudi metadata fields) containing [[rows]] with [[schema]]. * Returns the absolute path string of the written file. @@ -92,9 +88,13 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { val path = new StoragePath(s"$basePath/$fileName") val storage = HoodieTestUtils.getStorage(basePath) try { - val writer = new HoodieSparkLanceWriter( - path, schema, "20240101120000000", new SparkTaskContextSupplier(), - storage, false, HOption.of(bloom)) + val writer = HoodieSparkLanceWriter.builder + .file(path).sparkSchema(schema) + .instantTime("20240101120000000") + .taskContextSupplier(new SparkTaskContextSupplier()) + .storage(storage) + .populateMetaFields(false) + .bloomFilterOpt(HOption.of(bloom)).build try { rows.zipWithIndex.foreach { case (row, i) => writer.writeRow("key" + i, row) } } finally { From 7ae5dbc28017e4cc2625e71336e01aae50c9d9d9 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Sun, 5 Apr 2026 20:00:05 +0700 Subject: [PATCH 04/13] fixed by review notes 2 --- .../hudi/io/storage/LanceBatchIterator.java | 45 +++++++++----- .../lance/SparkLanceReaderBase.scala | 60 +++++++++++-------- ...HoodieFileGroupReaderBasedFileFormat.scala | 18 +++++- 3 files changed, 79 insertions(+), 44 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java index 8609392caee5b..6ff06557ef8cf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; /** * Iterator that returns {@link ColumnarBatch} directly from Lance files without @@ -73,9 +74,9 @@ public LanceBatchIterator(BufferAllocator allocator, LanceFileReader lanceReader, ArrowReader arrowReader, String path) { - this.allocator = allocator; - this.lanceReader = lanceReader; - this.arrowReader = arrowReader; + this.allocator = Objects.requireNonNull(allocator, "allocator must not be null"); + this.lanceReader = Objects.requireNonNull(lanceReader, "lanceReader must not be null"); + this.arrowReader = Objects.requireNonNull(arrowReader, "arrowReader must not be null"); this.path = path; } @@ -130,6 +131,7 @@ public void close() { IOException arrowException = null; Exception lanceException = null; + Exception allocatorException = null; // Don't close currentBatch here: ColumnarBatch.close() would close the // underlying Arrow FieldVectors through LanceArrowColumnVector, but they @@ -137,27 +139,38 @@ public void close() { // when arrowReader.close() is called below. currentBatch = null; - if (arrowReader != null) { - try { - arrowReader.close(); - } catch (IOException e) { - arrowException = e; - } + try { + arrowReader.close(); + } catch (IOException e) { + arrowException = e; } - if (lanceReader != null) { - try { - lanceReader.close(); - } catch (Exception e) { - lanceException = e; - } + try { + lanceReader.close(); + } catch (Exception e) { + lanceException = e; } - if (allocator != null) { + try { allocator.close(); + } catch (Exception e) { + allocatorException = e; } + // Propagate exceptions, attaching earlier ones as suppressed so nothing is silently lost. + if (allocatorException != null) { + if (arrowException != null) { + allocatorException.addSuppressed(arrowException); + } + if (lanceException != null) { + allocatorException.addSuppressed(lanceException); + } + throw new HoodieException("Failed to close Arrow allocator", allocatorException); + } if (arrowException != null) { + if (lanceException != null) { + arrowException.addSuppressed(lanceException); + } throw new HoodieIOException("Failed to close Arrow reader", arrowException); } if (lanceException != null) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index 58d5e604d6c99..76ce886231dd0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -29,6 +29,8 @@ import org.apache.hudi.io.memory.HoodieArrowAllocator import org.apache.hudi.io.storage.{BlobDescriptorTransform, HoodieSparkLanceReader, LanceBatchIterator, LanceRecordIterator, VectorConversionUtils} import org.apache.hudi.storage.StorageConfiguration +import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.vector.FieldVector import org.apache.arrow.vector.ipc.ArrowReader import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema.MessageType @@ -58,6 +60,9 @@ import scala.collection.JavaConverters._ */ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumnarFileReader { + /** Holds a pre-created all-null Arrow vector for a column missing from the file (schema evolution). */ + private case class NullColumnEntry(colIndex: Int, columnVector: LanceArrowColumnVector, arrowVector: FieldVector) + // Batch size for reading Lance files (number of rows per batch) private val DEFAULT_BATCH_SIZE = 512 @@ -97,9 +102,11 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna val allocator = HoodieArrowAllocator.newChildAllocator( getClass.getSimpleName + "-data-" + filePath, dataAllocatorSize) + var lanceReader: LanceFileReader = null + var arrowReader: ArrowReader = null try { // Open Lance file reader - val lanceReader = LanceFileReader.open(filePath, allocator) + lanceReader = LanceFileReader.open(filePath, allocator) // Get schema from Lance file. lance-spark strips Hudi's VECTOR descriptor during // Arrow→Spark conversion but keeps the fixed-size-list dimension on the Spark @@ -141,7 +148,7 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // the option regardless. val blobMode = resolveBlobReadMode(storageConf) val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build() - val arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE, readOpts) + arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE, readOpts) // Decide between batch mode and row mode. // Fall back to row mode if type casting is needed (batch-level type casting deferred to follow-up). @@ -174,9 +181,13 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna } catch { case e: Exception => if (lanceIterator != null) { - lanceIterator.close() // Close iterator which handles lifecycle for all objects + // Iterator owns allocator/lanceReader/arrowReader — let it close everything. + try { lanceIterator.close() } catch { case s: Exception => e.addSuppressed(s) } } else { - allocator.close() // Close allocator directly + // Iterator wasn't constructed yet; close whatever resources were opened, in reverse order. + try { if (arrowReader != null) arrowReader.close() } catch { case s: Exception => e.addSuppressed(s) } + try { if (lanceReader != null) lanceReader.close() } catch { case s: Exception => e.addSuppressed(s) } + try { allocator.close() } catch { case s: Exception => e.addSuppressed(s) } } throw new IOException(s"Failed to read Lance file: $filePath", e) } @@ -245,7 +256,7 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna * Used when enableVectorizedReader=true and no type casting is needed. */ private def readBatch(file: PartitionedFile, - allocator: org.apache.arrow.memory.BufferAllocator, + allocator: BufferAllocator, lanceReader: LanceFileReader, arrowReader: ArrowReader, filePath: String, @@ -264,24 +275,24 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // Create Arrow-backed null vectors for columns missing from the file. // Uses LanceArrowColumnVector so that Spark's vectorTypes() contract is satisfied // (FileSourceScanExec expects all data columns to be LanceArrowColumnVector). - val nullAllocator = if (columnMapping.contains(-1)) { - HoodieArrowAllocator.newChildAllocator( - getClass.getSimpleName + "-null-" + filePath, HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE) - } else null - - val nullColumnVectors: Array[(Int, LanceArrowColumnVector, org.apache.arrow.vector.FieldVector)] = - if (nullAllocator != null) { + val nullAllocator: Option[BufferAllocator] = if (columnMapping.contains(-1)) { + Some(HoodieArrowAllocator.newChildAllocator( + getClass.getSimpleName + "-null-" + filePath, HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE)) + } else None + + // Arrow vectors auto-reallocate on setValueCount (see BaseFixedWidthVector.setValueCount), + // so it is safe to call setValueCount with a count larger than DEFAULT_BATCH_SIZE. + val nullColumnVectors: Array[NullColumnEntry] = + nullAllocator.map { alloc => columnMapping.zipWithIndex.filter(_._1 < 0).map { case (_, idx) => val field = LanceArrowUtils.toArrowField( requiredSchema(idx).name, requiredSchema(idx).dataType, requiredSchema(idx).nullable, "UTC") - val arrowVector = field.createVector(nullAllocator) + val arrowVector = field.createVector(alloc) arrowVector.allocateNew() arrowVector.setValueCount(DEFAULT_BATCH_SIZE) - (idx, new LanceArrowColumnVector(arrowVector), arrowVector) + NullColumnEntry(idx, new LanceArrowColumnVector(arrowVector), arrowVector) } - } else { - Array.empty - } + }.getOrElse(Array.empty) // Pre-create partition column vectors (reused across batches, reset per batch) val hasPartitionColumns = partitionSchema.length > 0 @@ -316,12 +327,13 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna vectors(i) = sourceBatch.column(columnMapping(i)) } else { // Find the pre-created null vector for this index - val entry = nullColumnVectors.find(_._1 == i).get + val entry = nullColumnVectors.find(_.colIndex == i) + .getOrElse(throw new IllegalStateException(s"No null vector pre-created for column index $i")) // Adjust valueCount if batch size differs from allocated size - if (numRows != entry._3.getValueCount) { - entry._3.setValueCount(numRows) + if (numRows != entry.arrowVector.getValueCount) { + entry.arrowVector.setValueCount(numRows) } - vectors(i) = entry._2 + vectors(i) = entry.columnVector } i += 1 } @@ -346,10 +358,8 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna override def close(): Unit = { // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) - nullColumnVectors.foreach { case (_, columnVector, _) => - columnVector.close() - } - if (nullAllocator != null) nullAllocator.close() + nullColumnVectors.foreach(_.columnVector.close()) + nullAllocator.foreach(_.close()) batchIterator.close() partitionVectors.foreach(_.close()) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index beadf9d84dce8..a1c26a937b660 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -56,7 +56,7 @@ import org.apache.spark.sql.hudi.MultipleColumnarFileFormatReader import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils, LanceArrowColumnVector} import org.apache.spark.util.SerializableConfiguration import java.io.Closeable @@ -186,9 +186,21 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, if (hoodieFileFormat == HoodieFileFormat.LANCE && !isMultipleBaseFileFormatsEnabled) { // Lance uses LanceArrowColumnVector for data columns and OnHeapColumnVector for partition columns. // Spark uses vectorTypes to determine if columnar batch reading is supported. - val lanceVectorType = "org.apache.spark.sql.vectorized.LanceArrowColumnVector" + val lanceVectorType = classOf[LanceArrowColumnVector].getName val partitionVectorType = classOf[OnHeapColumnVector].getName - Some(Seq.fill(requiredSchema.length)(lanceVectorType) ++ Seq.fill(partitionSchema.length)(partitionVectorType)) + val baseTypes = Seq.fill(requiredSchema.length)(lanceVectorType) ++ Seq.fill(partitionSchema.length)(partitionVectorType) + if (mandatoryFields.isEmpty) { + Some(baseTypes) + } else { + // mandatoryFields are partition columns read from the file — use LanceArrowColumnVector for them + Some(baseTypes.zipWithIndex.map { case (vt, i) => + if (i >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(i - requiredSchema.length).name)) { + lanceVectorType + } else { + vt + } + }) + } } else { val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf) if (mandatoryFields.isEmpty) { From 6f25d159113b28730e120161712d3dc48033ff62 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Wed, 15 Apr 2026 12:10:32 +0700 Subject: [PATCH 05/13] fix(lance): address PR #18403 review round 3 - Idempotent close() in SparkLanceReaderBase.readBatch iterator: guard nullAllocator and partitionVectors against double-free when both the TaskContext listener and the outer CloseableIteratorListener invoke close(). - Close iterator in TestLanceColumnarBatch.runReadAndCollect: wrap the consume loop in try/finally and close via AutoCloseable; needed on the driver where TaskContext.get() is null and the completion-listener path never runs. - Assert vectorized scan path in 3 integration tests via a new helper that inspects SQLMetrics on a columnar FileSourceScan/BatchScan, descending into AdaptiveSparkPlanExec and QueryStageExec to reach the scan under AQE. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../lance/SparkLanceReaderBase.scala | 16 ++-- .../functional/TestLanceColumnarBatch.scala | 79 ++++++++++++++++--- 2 files changed, 78 insertions(+), 17 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index 76ce886231dd0..cdf77ab3d788e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -312,6 +312,8 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // Map each source batch to a batch with the correct column layout. val mappedIterator = new Iterator[ColumnarBatch] with Closeable { + private[this] var closed = false + override def hasNext: Boolean = batchIterator.hasNext() override def next(): ColumnarBatch = { @@ -357,11 +359,15 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna } override def close(): Unit = { - // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) - nullColumnVectors.foreach(_.columnVector.close()) - nullAllocator.foreach(_.close()) - batchIterator.close() - partitionVectors.foreach(_.close()) + // Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close(). + if (!closed) { + closed = true + // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) + nullColumnVectors.foreach(_.columnVector.close()) + nullAllocator.foreach(_.close()) + batchIterator.close() + partitionVectors.foreach(_.close()) + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala index ed190bed5d127..78c7028172733 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala @@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ @@ -157,23 +159,67 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { val iter = reader.read(pf, requiredSchema, partitionSchema, HOption.empty(), Seq.empty[Filter], storageConf) .asInstanceOf[Iterator[Any]] - while (iter.hasNext) { - iter.next() match { - case batch: ColumnarBatch => - batchCount += 1 - batch.rowIterator().asScala.foreach { row => + try { + while (iter.hasNext) { + iter.next() match { + case batch: ColumnarBatch => + batchCount += 1 + batch.rowIterator().asScala.foreach { row => + rows += extractRow(row, requiredSchema) + } + + case row: InternalRow => + rowCount += 1 rows += extractRow(row, requiredSchema) - } - - case row: InternalRow => - rowCount += 1 - rows += extractRow(row, requiredSchema) + } + } + } finally { + // On the driver in unit tests TaskContext.get() is null, so the + // completion-listener cleanup path inside readBatch() never fires. + // Explicitly close to release Arrow allocators and file readers. + iter match { + case c: AutoCloseable => c.close() + case _ => // row path returns a plain Iterator } } (batchCount, rowCount, rows.toSeq) } + /** + * Assert that the executed plan for `df` used a columnar (vectorized) scan and + * that the scan's `numOutputRows` SQLMetric is populated. This guards against a + * silent fallback to the row reader that would still produce correct output. + * + * The metric is lazy: the caller must have already forced execution (e.g. via + * `df.collect()`) before invoking this helper. + */ + private def assertLanceVectorizedScan(df: DataFrame, expectedMinRows: Long): Unit = { + val rootPlan = df.queryExecution.executedPlan + + def allNodes(p: SparkPlan): Seq[SparkPlan] = { + val hidden: Seq[SparkPlan] = p match { + case aqe: AdaptiveSparkPlanExec => allNodes(aqe.executedPlan) + case qs: QueryStageExec => allNodes(qs.plan) + case _ => Seq.empty + } + (p +: p.children.flatMap(allNodes)) ++ hidden + } + + val nodes = allNodes(rootPlan) + val scanOpt = nodes.collectFirst { + case s: SparkPlan if s.supportsColumnar && + (s.getClass.getSimpleName.contains("FileSourceScan") || + s.getClass.getSimpleName.contains("BatchScan")) => s + } + assertTrue(scanOpt.isDefined, + s"Expected a columnar (supportsColumnar=true) scan node in executed plan, found none:\n${rootPlan.treeString}") + val scan = scanOpt.get + val numOutputRows = scan.metrics.get("numOutputRows").map(_.value).getOrElse(-1L) + assertTrue(numOutputRows >= expectedMinRows, + s"Columnar Lance scan produced $numOutputRows rows, expected >= $expectedMinRows; plan:\n${rootPlan.treeString}") + } + private val baseSchema: StructType = new StructType() .add("id", IntegerType, nullable = false) .add("name", StringType, nullable = true) @@ -423,6 +469,8 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { assertEquals(ename, row.getAs[String]("name")) assertEquals(escore, row.getAs[Double]("score"), 1e-6) } + + assertLanceVectorizedScan(actual, 3) } /** @@ -476,6 +524,9 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { assertEquals("engineering", rows(2).getAs[String]("department")) assertEquals(4, rows(3).getAs[Int]("id")) assertEquals("sales", rows(3).getAs[String]("department")) + + // Confirm the vectorized scan path was used for at least one of the scanned files. + assertLanceVectorizedScan(actual, 1) } /** @@ -501,14 +552,18 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { spark.read.format("hudi").load(tablePath).createOrReplaceTempView(tableName) - val result = spark.sql( + val resultDf = spark.sql( s"SELECT id, name, age FROM $tableName WHERE age > 30 ORDER BY id" - ).collect() + ) + val result = resultDf.collect() assertEquals(2, result.length, "Only Charlie (35) and Eve (32) satisfy age > 30") assertEquals(3, result(0).getAs[Int]("id")) assertEquals("Charlie", result(0).getAs[String]("name")) assertEquals(5, result(1).getAs[Int]("id")) assertEquals("Eve", result(1).getAs[String]("name")) + + // `numOutputRows` on the scan reflects pre-filter rows; all 5 rows flow through the columnar scan. + assertLanceVectorizedScan(resultDf, 1) } } From 8becde8c9d73f9a76a392176b461cc79d791cdab Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Wed, 15 Apr 2026 17:15:34 +0700 Subject: [PATCH 06/13] fix(lance): address PR #18403 review round 4 - Handle TimestampNTZType in SparkLanceReaderBase.populatePartitionVectors: TIMESTAMP_NTZ is internally a Long (microseconds since epoch) just like TimestampType, but was falling through to the wildcard case and producing all-null partition values for any TIMESTAMP_NTZ-partitioned table on the vectorized path. - Replace per-batch O(n) linear scan `nullColumnVectors.find(_.colIndex == i)` with a direct-indexed `nullColumnByIndex: Array[NullColumnEntry]` lookup, removing m*k array scans for tables with m missing columns and k batches. - Add TestLanceColumnarBatch#testColumnarPathPartitionVectorTimestampNtz regression test that fails before the populatePartitionVectors fix because every row would carry a null partition value. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../lance/SparkLanceReaderBase.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index cdf77ab3d788e..87cc184cf0cc7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -294,6 +294,13 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna } }.getOrElse(Array.empty) + // Direct-indexed lookup so the per-batch hot loop is O(1) instead of scanning nullColumnVectors. + val nullColumnByIndex: Array[NullColumnEntry] = { + val arr = new Array[NullColumnEntry](requiredSchema.length) + nullColumnVectors.foreach(e => arr(e.colIndex) = e) + arr + } + // Pre-create partition column vectors (reused across batches, reset per batch) val hasPartitionColumns = partitionSchema.length > 0 val partitionVectors: Array[WritableColumnVector] = if (hasPartitionColumns) { @@ -328,9 +335,11 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna if (columnMapping(i) >= 0) { vectors(i) = sourceBatch.column(columnMapping(i)) } else { - // Find the pre-created null vector for this index - val entry = nullColumnVectors.find(_.colIndex == i) - .getOrElse(throw new IllegalStateException(s"No null vector pre-created for column index $i")) + // Direct-indexed lookup (O(1)) for the pre-created null vector for this column. + val entry = nullColumnByIndex(i) + if (entry == null) { + throw new IllegalStateException(s"No null vector pre-created for column index $i") + } // Adjust valueCount if batch size differs from allocated size if (numRows != entry.arrowVector.getValueCount) { entry.arrowVector.setValueCount(numRows) @@ -460,7 +469,7 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna case IntegerType | DateType => val v = partitionValues.getInt(i) vector.putInts(0, numRows, v) - case LongType | TimestampType => + case LongType | TimestampType | TimestampNTZType => val v = partitionValues.getLong(i) vector.putLongs(0, numRows, v) case FloatType => From af4868994e1309f58c5d40611b66ec0db698737b Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Sat, 18 Apr 2026 14:39:44 +0700 Subject: [PATCH 07/13] fix(lance): close all resources best-effort in vectorized reader Previously `closed = true` was set before cleanup, so if any single close() threw (null vector, null allocator, batch iterator, or partition vectors), the remaining closes were skipped and every subsequent close() call became a no-op, stranding Arrow/Lance resources. Attempt every close inside a closeSafely wrapper that captures the first exception and attaches the rest as suppressed; throw once all closes have been attempted. Idempotency via `closed = true` is preserved at the start so repeated close() calls from TaskCompletionListener + CloseableIteratorListener are still safe. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lance/SparkLanceReaderBase.scala | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index 87cc184cf0cc7..ea74baf559d34 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -371,11 +371,24 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close(). if (!closed) { closed = true + // Attempt every close; if one throws, keep going and surface the first error with the + // rest attached as suppressed, so one failure cannot strand Arrow/Lance resources. + var closeError: Exception = null + def closeSafely(f: => Unit): Unit = { + try { + f + } catch { + case e: Exception => + if (closeError == null) closeError = e + else closeError.addSuppressed(e) + } + } // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) - nullColumnVectors.foreach(_.columnVector.close()) - nullAllocator.foreach(_.close()) - batchIterator.close() - partitionVectors.foreach(_.close()) + nullColumnVectors.foreach(v => closeSafely(v.columnVector.close())) + nullAllocator.foreach(a => closeSafely(a.close())) + closeSafely(batchIterator.close()) + partitionVectors.foreach(v => closeSafely(v.close())) + if (closeError != null) throw closeError } } } From af4ef924e8130a12908bcf80ed1f6c366372d6af Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 20 Apr 2026 11:06:48 +0700 Subject: [PATCH 08/13] fix(lance): use bulk putFloats/putDoubles for partition value fill Replaces per-element while loops in populatePartitionVectors for FloatType and DoubleType with the WritableColumnVector bulk-fill overloads, matching the style already used for IntegerType/DateType (putInts) and LongType/TimestampType/TimestampNTZType (putLongs). No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../execution/datasources/lance/SparkLanceReaderBase.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index ea74baf559d34..448414b7e6f45 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -487,12 +487,10 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna vector.putLongs(0, numRows, v) case FloatType => val v = partitionValues.getFloat(i) - var j = 0 - while (j < numRows) { vector.putFloat(j, v); j += 1 } + vector.putFloats(0, numRows, v) case DoubleType => val v = partitionValues.getDouble(i) - var j = 0 - while (j < numRows) { vector.putDouble(j, v); j += 1 } + vector.putDoubles(0, numRows, v) case StringType => val v = partitionValues.getUTF8String(i) val bytes = v.getBytes From 8a4be1dc273ba21b21e7a53f8844d60cadf6fba0 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Tue, 21 Apr 2026 09:17:44 +0700 Subject: [PATCH 09/13] fix(lance): close row-path iterator and test iterators on driver readRows() in SparkLanceReaderBase returned a plain Scala Iterator, so lanceIterator.close() was unreachable whenever TaskContext.get() was null (driver / direct-reader callers). Wrap the projected stream in Iterator[InternalRow] with Closeable, matching readBatch()'s pattern and its idempotent closed flag. LanceRecordIterator.close() is itself idempotent, so calls from the TaskContext completion listener and from explicit close() coexist safely. Two direct-reader tests in TestLanceColumnarBatch consumed the iterator without the try/finally guard used in the readAndCollect() helper. Apply the same "case c: AutoCloseable => c.close()" pattern so driver-side tests do not leak Arrow allocators across the suite. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lance/SparkLanceReaderBase.scala | 34 ++++++++----- .../functional/TestLanceColumnarBatch.scala | 50 ++++++++++++------- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index 448414b7e6f45..e96fc9733883f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -412,8 +412,6 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna partitionSchema: StructType, implicitTypeChangeInfo: java.util.Map[Integer, HoodiePair[DataType, DataType]]): Iterator[InternalRow] = { - val baseIter: Iterator[InternalRow] = recordIterator.asScala - // Create the following projections for schema evolution: // 1. Padding projection: add NULL for missing columns // 2. Casting projection: handle type conversions @@ -433,21 +431,33 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna def apply(row: InternalRow): UnsafeRow = castProj(paddingProj(row)) } - val projectedIter = baseIter.map(projection.apply) - - // Handle partition columns - if (partitionSchema.length == 0) { - // No partition columns - return rows directly - projectedIter + // Compose the per-row projection once. With partition columns, chain the + // JoinedRow/UnsafeProjection after the padding+cast projection. + val rowProjection: InternalRow => InternalRow = if (partitionSchema.length == 0) { + projection.apply } else { - // Create UnsafeProjection to convert JoinedRow to UnsafeRow val fullSchema = (requiredSchema.fields ++ partitionSchema.fields).map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - // Append partition values to each row using JoinedRow, then convert to UnsafeRow val joinedRow = new JoinedRow() - projectedIter.map(row => unsafeProjection(joinedRow(row, file.partitionValues))) + row => unsafeProjection(joinedRow(projection(row), file.partitionValues)) + } + + // Wrap in a Closeable iterator so driver/direct-reader callers (TaskContext.get() + // is null) have an explicit cleanup handle. close() is idempotent and safe to + // run alongside the TaskContext completion listener above, since + // LanceRecordIterator.close() is itself idempotent. + val src = recordIterator.asScala + new Iterator[InternalRow] with Closeable { + private[this] var closed = false + override def hasNext: Boolean = src.hasNext + override def next(): InternalRow = rowProjection(src.next()) + override def close(): Unit = { + if (!closed) { + closed = true + recordIterator.close() + } + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala index 78c7028172733..e85cb0b35e1f2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala @@ -326,15 +326,22 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { val iter = reader.read(pf, requiredSchemaWithExtra, new StructType(), HOption.empty(), Seq.empty[Filter], storageConf).asInstanceOf[Iterator[Any]] - while (iter.hasNext) { - iter.next() match { - case batch: ColumnarBatch => - batchesSeen += 1 - batch.rowIterator().asScala.foreach { row => - nullPadded += row.isNullAt(2) - } + try { + while (iter.hasNext) { + iter.next() match { + case batch: ColumnarBatch => + batchesSeen += 1 + batch.rowIterator().asScala.foreach { row => + nullPadded += row.isNullAt(2) + } + case _ => + fail("Vectorized path with null-padding must return ColumnarBatch") + } + } + } finally { + iter match { + case c: AutoCloseable => c.close() case _ => - fail("Vectorized path with null-padding must return ColumnarBatch") } } @@ -369,17 +376,24 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { val iter = reader.read(pf, baseSchema, partitionSchema, HOption.empty(), Seq.empty[Filter], storageConf).asInstanceOf[Iterator[Any]] - while (iter.hasNext) { - iter.next() match { - case batch: ColumnarBatch => - batchesSeen += 1 - assertEquals(baseSchema.size + partitionSchema.size, batch.numCols(), - "Batch column count must equal data + partition columns") - batch.rowIterator().asScala.foreach { row => - depts += row.getUTF8String(baseSchema.size).toString - } + try { + while (iter.hasNext) { + iter.next() match { + case batch: ColumnarBatch => + batchesSeen += 1 + assertEquals(baseSchema.size + partitionSchema.size, batch.numCols(), + "Batch column count must equal data + partition columns") + batch.rowIterator().asScala.foreach { row => + depts += row.getUTF8String(baseSchema.size).toString + } + case _ => + fail("Vectorized path must return ColumnarBatch when partitionSchema is present") + } + } + } finally { + iter match { + case c: AutoCloseable => c.close() case _ => - fail("Vectorized path must return ColumnarBatch when partitionSchema is present") } } From 47fa84354d34aeaad8a0ffbe5df46d99c9f4bcd5 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 4 May 2026 14:37:19 +0700 Subject: [PATCH 10/13] fix(lance): adapt to Lance 4.0.0 API moves after rebase The master rebase pulled in #18498 which bumped lance to 4.0.0 and lance-spark to 0.4.0. Two API moves were not flagged by git's textual auto-merge but break compilation against the new classpath: * `LanceArrowColumnVector` moved from `org.apache.spark.sql.vectorized` to `org.lance.spark.vectorized`. Updated imports in LanceBatchIterator, SparkLanceReaderBase, HoodieFileGroupReaderBasedFileFormat, and the scaladoc references in TestLanceColumnarBatch. * `HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE` was retired in favor of the configurable `HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES` config property. The vectorized read path's null-vector allocator now reads its size from that config (mirroring how `read()` already sizes the data allocator after the master changes). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../org/apache/hudi/io/storage/LanceBatchIterator.java | 2 +- .../datasources/lance/SparkLanceReaderBase.scala | 8 +++++--- .../parquet/HoodieFileGroupReaderBasedFileFormat.scala | 3 ++- .../apache/hudi/functional/TestLanceColumnarBatch.scala | 4 ++-- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java index 6ff06557ef8cf..c8e657098e56d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java @@ -26,8 +26,8 @@ import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.spark.sql.vectorized.LanceArrowColumnVector; import org.lance.file.LanceFileReader; +import org.lance.spark.vectorized.LanceArrowColumnVector; import java.io.Closeable; import java.io.IOException; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index e96fc9733883f..942bd1bfce82d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -26,7 +26,7 @@ import org.apache.hudi.common.util import org.apache.hudi.common.util.collection.{ClosableIterator, Pair => HoodiePair} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.io.memory.HoodieArrowAllocator -import org.apache.hudi.io.storage.{BlobDescriptorTransform, HoodieSparkLanceReader, LanceBatchIterator, LanceRecordIterator, VectorConversionUtils} +import org.apache.hudi.io.storage.{BlobDescriptorTransform, LanceBatchIterator, LanceRecordIterator, VectorConversionUtils} import org.apache.hudi.storage.StorageConfiguration import org.apache.arrow.memory.BufferAllocator @@ -44,8 +44,9 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.sql.util.LanceArrowUtils -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector, LanceArrowColumnVector} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.lance.file.{BlobReadMode, FileReadOptions, LanceFileReader} +import org.lance.spark.vectorized.LanceArrowColumnVector import java.io.{Closeable, IOException} @@ -277,7 +278,8 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna // (FileSourceScanExec expects all data columns to be LanceArrowColumnVector). val nullAllocator: Option[BufferAllocator] = if (columnMapping.contains(-1)) { Some(HoodieArrowAllocator.newChildAllocator( - getClass.getSimpleName + "-null-" + filePath, HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE)) + getClass.getSimpleName + "-null-" + filePath, + HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES.defaultValue().toLong)) } else None // Arrow vectors auto-reallocate on setValueCount (see BaseFixedWidthVector.setValueCount), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index a1c26a937b660..19880647f0e07 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -56,8 +56,9 @@ import org.apache.spark.sql.hudi.MultipleColumnarFileFormatReader import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils, LanceArrowColumnVector} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils} import org.apache.spark.util.SerializableConfiguration +import org.lance.spark.vectorized.LanceArrowColumnVector import java.io.Closeable diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala index e85cb0b35e1f2..646bc9eeea0fa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala @@ -275,7 +275,7 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { /** * Columnar path (`enableVectorizedReader=true`, no type changes) must return - * [[ColumnarBatch]] instances backed by [[org.apache.spark.sql.vectorized.LanceArrowColumnVector]]. + * [[ColumnarBatch]] instances backed by [[org.lance.spark.vectorized.LanceArrowColumnVector]]. * No [[InternalRow]] may be returned directly. Data must round-trip correctly. */ @Test @@ -295,7 +295,7 @@ class TestLanceColumnarBatch extends HoodieSparkClientTestBase { /** * When the required schema contains a column that is absent from the file (schema evolution: * column addition), the vectorized path must null-pad that column in each [[ColumnarBatch]] - * using a [[org.apache.spark.sql.vectorized.LanceArrowColumnVector]] backed by an all-null + * using a [[org.lance.spark.vectorized.LanceArrowColumnVector]] backed by an all-null * Arrow vector. */ @Test From 104a52f2203219a3dd81658eb3852b53fe9d76a9 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 4 May 2026 16:16:38 +0700 Subject: [PATCH 11/13] fix(lance): address PR #18403 review round 5 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/io/storage/LanceBatchIterator.java | 21 ++++++---- .../lance/SparkLanceReaderBase.scala | 41 +++++++++++++++---- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java index c8e657098e56d..531e20f5aa495 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java @@ -58,7 +58,7 @@ public class LanceBatchIterator implements Iterator, Closeable { private ColumnVector[] columnVectors; private ColumnarBatch currentBatch; - private boolean nextBatchLoaded = false; + private boolean hasCachedBatch = false; private boolean finished = false; private boolean closed = false; @@ -85,7 +85,7 @@ public boolean hasNext() { if (finished) { return false; } - if (nextBatchLoaded) { + if (hasCachedBatch) { return true; } @@ -102,7 +102,7 @@ public boolean hasNext() { } currentBatch = new ColumnarBatch(columnVectors, root.getRowCount()); - nextBatchLoaded = true; + hasCachedBatch = true; return true; } } catch (IOException e) { @@ -118,7 +118,7 @@ public ColumnarBatch next() { if (!hasNext()) { throw new NoSuchElementException("No more batches available"); } - nextBatchLoaded = false; + hasCachedBatch = false; return currentBatch; } @@ -129,7 +129,7 @@ public void close() { } closed = true; - IOException arrowException = null; + Exception arrowException = null; Exception lanceException = null; Exception allocatorException = null; @@ -141,7 +141,7 @@ public void close() { try { arrowReader.close(); - } catch (IOException e) { + } catch (Exception e) { arrowException = e; } @@ -171,7 +171,14 @@ public void close() { if (lanceException != null) { arrowException.addSuppressed(lanceException); } - throw new HoodieIOException("Failed to close Arrow reader", arrowException); + // Preserve the IOException-specific subclass when the underlying close raised IOException; + // fall back to HoodieException so RuntimeExceptions from arrowReader.close() are still surfaced + // (and don't strand the lanceReader/allocator close paths above). + if (arrowException instanceof IOException) { + throw new HoodieIOException("Failed to close Arrow reader", (IOException) arrowException); + } else { + throw new HoodieException("Failed to close Arrow reader", arrowException); + } } if (lanceException != null) { throw new HoodieException("Failed to close Lance reader", lanceException); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index 942bd1bfce82d..937d394de241c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -62,7 +62,7 @@ import scala.collection.JavaConverters._ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumnarFileReader { /** Holds a pre-created all-null Arrow vector for a column missing from the file (schema evolution). */ - private case class NullColumnEntry(colIndex: Int, columnVector: LanceArrowColumnVector, arrowVector: FieldVector) + private case class NullColumnEntry(colIndex: Int, lanceColumnVector: LanceArrowColumnVector, arrowVector: FieldVector) // Batch size for reading Lance files (number of rows per batch) private val DEFAULT_BATCH_SIZE = 512 @@ -152,9 +152,15 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE, readOpts) // Decide between batch mode and row mode. - // Fall back to row mode if type casting is needed (batch-level type casting deferred to follow-up). + // Fall back to row mode if: + // - type casting is needed (batch-level type casting deferred to follow-up), OR + // - the partition schema contains a type the batch-mode partition-vector populator + // does not handle (Struct/Array/Map/Char/Varchar/interval, etc.). The row path + // preserves these via JoinedRow, so falling back avoids silently nulling them out. val hasTypeChanges = !implicitTypeChangeInfo.isEmpty - if (enableVectorizedReader && !hasTypeChanges) { + val partitionTypesBatchSupported = + partitionSchema.forall(f => isPartitionTypeSupportedForBatch(f.dataType)) + if (enableVectorizedReader && !hasTypeChanges && partitionTypesBatchSupported) { readBatch(file, allocator, lanceReader, arrowReader, filePath, requestSchema, requiredSchema, partitionSchema) } else { @@ -346,7 +352,7 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna if (numRows != entry.arrowVector.getValueCount) { entry.arrowVector.setValueCount(numRows) } - vectors(i) = entry.columnVector + vectors(i) = entry.lanceColumnVector } i += 1 } @@ -386,7 +392,7 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna } } // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) - nullColumnVectors.foreach(v => closeSafely(v.columnVector.close())) + nullColumnVectors.foreach(v => closeSafely(v.lanceColumnVector.close())) nullAllocator.foreach(a => closeSafely(a.close())) closeSafely(batchIterator.close()) partitionVectors.foreach(v => closeSafely(v.close())) @@ -463,6 +469,21 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna } } + /** + * Whether the batch-mode partition-vector populator can handle this partition type. + * Must stay in sync with the match in [[populatePartitionVectors]] — types not listed here + * (Struct/Array/Map/CharType/VarcharType/interval types, etc.) cause the read() path to fall + * back to row mode, where JoinedRow preserves the value as-is. + */ + private def isPartitionTypeSupportedForBatch(dt: DataType): Boolean = dt match { + case BooleanType | ByteType | ShortType | IntegerType | DateType => true + case LongType | TimestampType | TimestampNTZType => true + case FloatType | DoubleType => true + case StringType | BinaryType => true + case _: DecimalType => true + case _ => false + } + /** * Populate writable column vectors with constant partition values. * Each vector is filled with the same value for all rows. @@ -527,9 +548,13 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna val v = partitionValues.getBinary(i) var j = 0 while (j < numRows) { vector.putByteArray(j, v); j += 1 } - case _ => - // For unsupported types, fill with nulls - vector.putNulls(0, numRows) + case other => + // Unreachable: read() gates batch mode on isPartitionTypeSupportedForBatch so unsupported + // partition types are routed to the row path. Throw loudly if that gate ever regresses + // rather than silently nulling out the user's partition values. + throw new IllegalStateException( + s"Unsupported partition type for Lance batch mode: $other (column ${partitionSchema(i).name}). " + + "isPartitionTypeSupportedForBatch must be kept in sync with populatePartitionVectors.") } } i += 1 From 4cf5757e4693fe7e0572173eb0abc98908331902 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Tue, 5 May 2026 09:52:48 +0700 Subject: [PATCH 12/13] fix(lance): force row-path for BLOB-column reads so DESCRIPTOR mode is honored Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lance/SparkLanceReaderBase.scala | 20 ++++++++++---- ...HoodieFileGroupReaderBasedFileFormat.scala | 26 +++++++++++++++++-- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index 937d394de241c..d0e39a7b4f0d2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -151,24 +151,34 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build() arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE, readOpts) + // BLOB columns in the request — needed both for the DESCRIPTOR-mode row-path transform + // and for the path-selection gate below. + val blobFieldNames: java.util.Set[String] = + iteratorSchema.fields.collect { case f if isBlobField(f) => f.name }.toSet.asJava + // Decide between batch mode and row mode. // Fall back to row mode if: // - type casting is needed (batch-level type casting deferred to follow-up), OR // - the partition schema contains a type the batch-mode partition-vector populator // does not handle (Struct/Array/Map/Char/Varchar/interval, etc.). The row path - // preserves these via JoinedRow, so falling back avoids silently nulling them out. + // preserves these via JoinedRow, so falling back avoids silently nulling them out, OR + // - DESCRIPTOR blob mode is requested with BLOB columns present. The descriptor + // rewrite (data→null + synthesized/passthrough reference) lives in BlobDescriptorTransform + // which only the row path applies; the batch path would surface Lance's raw + // {position, size} struct in `data`, breaking the DESCRIPTOR-mode contract. val hasTypeChanges = !implicitTypeChangeInfo.isEmpty val partitionTypesBatchSupported = partitionSchema.forall(f => isPartitionTypeSupportedForBatch(f.dataType)) - if (enableVectorizedReader && !hasTypeChanges && partitionTypesBatchSupported) { + val descriptorBlobReadNeedsRowPath = + blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty + if (enableVectorizedReader && !hasTypeChanges && partitionTypesBatchSupported + && !descriptorBlobReadNeedsRowPath) { readBatch(file, allocator, lanceReader, arrowReader, filePath, requestSchema, requiredSchema, partitionSchema) } else { // Compose the DESCRIPTOR-aware blob transform only when the user opted into that mode // AND the request actually has BLOB columns (otherwise the rewrite has nothing to do). - val blobFieldNames: java.util.Set[String] = - iteratorSchema.fields.collect { case f if isBlobField(f) => f.name }.toSet.asJava - val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty) { + val blobTransform = if (descriptorBlobReadNeedsRowPath) { new BlobDescriptorTransform(blobFieldNames, filePath) } else { null diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 19880647f0e07..5738904e09419 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -24,7 +24,7 @@ import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieFileFormat -import org.apache.hudi.common.schema.HoodieSchema +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} import org.apache.hudi.common.schema.HoodieSchemaUtils import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, ParquetTableSchemaResolver} import org.apache.hudi.common.table.read.HoodieFileGroupReader @@ -134,6 +134,24 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } } + /** + * Whether the requested schema contains any top-level BLOB columns. Used to disable + * Lance batch mode for BLOB tables: the DESCRIPTOR-mode rewrite (and the OUT_OF_LINE + * data→null contract) lives only in the row-path BlobDescriptorTransform, and `supportBatch` + * cannot inspect read-time options (e.g. `hoodie.read.blob.inline.mode`) since it runs at + * planning time. Forcing row mode whenever BLOB columns are present is the simplest correct + * gate — BLOB processing is per-row anyway (lazy byte materialization) so the perf delta is + * negligible. + */ + private def schemaContainsBlobColumn(schema: StructType): Boolean = { + schema.fields.exists { f => + val md = f.metadata + md != null && md.contains(HoodieSchema.TYPE_METADATA_FIELD) && + HoodieSchema.parseTypeDescriptor(md.getString(HoodieSchema.TYPE_METADATA_FIELD)) + .getType == HoodieSchemaType.BLOB + } + } + /** * Checks if the file format supports vectorized reading, please refer to SPARK-40918. * @@ -158,7 +176,11 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val orcBatchSupported = conf.orcVectorizedReaderEnabled && schema.forall(s => OrcUtils.supportColumnarReads( s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled)) - val lanceBatchSupported = true + // Lance batch mode cannot honor `hoodie.read.blob.inline.mode=DESCRIPTOR` (the rewrite + // lives in BlobDescriptorTransform on the row path). Disable batch whenever BLOB columns + // are present so the planner doesn't commit to ColumnarBatch output that the row-path + // reader cannot deliver. + val lanceBatchSupported = !schemaContainsBlobColumn(schema) val supportBatch = if (isMultipleBaseFileFormatsEnabled) { parquetBatchSupported && orcBatchSupported From fe8f7e1c2a56afb9f9fa12bad3a98edb1a3bedf3 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Tue, 5 May 2026 15:28:48 +0700 Subject: [PATCH 13/13] fix(lance): address PR #18403 review round 6 - Gate `supportBatch` off when `internalSchemaOpt.isPresent` so the planner does not commit to ColumnarBatch output that the row-path fallback (taken inside SparkLanceReaderBase.read on `hasTypeChanges`) cannot deliver. Same shape as the existing BLOB-column gate; without this, `ColumnarToRowExec` would ClassCastException on the row-path iterator. Addresses review comments 3183992481 and 3186279309. - Collapse `nullColumnVectors` and `nullColumnByIndex` into a single sparse `Array[NullColumnEntry]` indexed by required-schema position, built directly inside `nullAllocator.map { ... }`. Drop `colIndex` from `NullColumnEntry` (direct indexing makes it redundant). The per-batch hot loop is unchanged; close() walks the sparse array and skips null slots. Addresses review comment 3183992489. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lance/SparkLanceReaderBase.scala | 29 ++++++++++--------- ...HoodieFileGroupReaderBasedFileFormat.scala | 16 ++++++---- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala index d0e39a7b4f0d2..c59ee115fb448 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala @@ -62,7 +62,7 @@ import scala.collection.JavaConverters._ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumnarFileReader { /** Holds a pre-created all-null Arrow vector for a column missing from the file (schema evolution). */ - private case class NullColumnEntry(colIndex: Int, lanceColumnVector: LanceArrowColumnVector, arrowVector: FieldVector) + private case class NullColumnEntry(lanceColumnVector: LanceArrowColumnVector, arrowVector: FieldVector) // Batch size for reading Lance files (number of rows per batch) private val DEFAULT_BATCH_SIZE = 512 @@ -298,27 +298,24 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES.defaultValue().toLong)) } else None + // Sparse array indexed by required-schema position so the per-batch hot loop is O(1). + // Slots stay null for columns that come from the file (i.e. columnMapping(i) >= 0). // Arrow vectors auto-reallocate on setValueCount (see BaseFixedWidthVector.setValueCount), // so it is safe to call setValueCount with a count larger than DEFAULT_BATCH_SIZE. - val nullColumnVectors: Array[NullColumnEntry] = + val nullColumnByIndex: Array[NullColumnEntry] = nullAllocator.map { alloc => - columnMapping.zipWithIndex.filter(_._1 < 0).map { case (_, idx) => + val arr = new Array[NullColumnEntry](requiredSchema.length) + columnMapping.zipWithIndex.filter(_._1 < 0).foreach { case (_, idx) => val field = LanceArrowUtils.toArrowField( requiredSchema(idx).name, requiredSchema(idx).dataType, requiredSchema(idx).nullable, "UTC") val arrowVector = field.createVector(alloc) arrowVector.allocateNew() arrowVector.setValueCount(DEFAULT_BATCH_SIZE) - NullColumnEntry(idx, new LanceArrowColumnVector(arrowVector), arrowVector) + arr(idx) = NullColumnEntry(new LanceArrowColumnVector(arrowVector), arrowVector) } + arr }.getOrElse(Array.empty) - // Direct-indexed lookup so the per-batch hot loop is O(1) instead of scanning nullColumnVectors. - val nullColumnByIndex: Array[NullColumnEntry] = { - val arr = new Array[NullColumnEntry](requiredSchema.length) - nullColumnVectors.foreach(e => arr(e.colIndex) = e) - arr - } - // Pre-create partition column vectors (reused across batches, reset per batch) val hasPartitionColumns = partitionSchema.length > 0 val partitionVectors: Array[WritableColumnVector] = if (hasPartitionColumns) { @@ -401,8 +398,14 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna else closeError.addSuppressed(e) } } - // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) - nullColumnVectors.foreach(v => closeSafely(v.lanceColumnVector.close())) + // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator). + // nullColumnByIndex is sparse: skip slots for columns that came from the file. + var k = 0 + while (k < nullColumnByIndex.length) { + val entry = nullColumnByIndex(k) + if (entry != null) closeSafely(entry.lanceColumnVector.close()) + k += 1 + } nullAllocator.foreach(a => closeSafely(a.close())) closeSafely(batchIterator.close()) partitionVectors.foreach(v => closeSafely(v.close())) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 5738904e09419..1d711875cfa30 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -176,11 +176,17 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val orcBatchSupported = conf.orcVectorizedReaderEnabled && schema.forall(s => OrcUtils.supportColumnarReads( s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled)) - // Lance batch mode cannot honor `hoodie.read.blob.inline.mode=DESCRIPTOR` (the rewrite - // lives in BlobDescriptorTransform on the row path). Disable batch whenever BLOB columns - // are present so the planner doesn't commit to ColumnarBatch output that the row-path - // reader cannot deliver. - val lanceBatchSupported = !schemaContainsBlobColumn(schema) + // Lance batch mode cannot honor either of these scenarios — they both force a runtime + // row path inside SparkLanceReaderBase.read: + // - `hoodie.read.blob.inline.mode=DESCRIPTOR`: the BLOB rewrite lives in + // BlobDescriptorTransform on the row path only. + // - schema-on-read with implicit type changes: SparkSchemaTransformUtils builds the + // cast projection on the row path only; the batch path has no equivalent. + // Disable batch at plan time whenever either trigger could fire so the planner doesn't + // commit to ColumnarBatch output that the row-path reader cannot deliver — Spark would + // wrap the scan with ColumnarToRowExec and ClassCastException on the first InternalRow. + val lanceBatchSupported = + !schemaContainsBlobColumn(schema) && !internalSchemaOpt.isPresent val supportBatch = if (isMultipleBaseFileFormatsEnabled) { parquetBatchSupported && orcBatchSupported