diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java
new file mode 100644
index 0000000000000..531e20f5aa495
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.lance.file.LanceFileReader;
+import org.lance.spark.vectorized.LanceArrowColumnVector;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Iterator that returns {@link ColumnarBatch} directly from Lance files without
+ * decomposing to individual rows. Used for vectorized/columnar batch reading
+ * in Spark's COW base-file-only read path.
+ *
+ *
Unlike {@link LanceRecordIterator} which extracts rows one by one,
+ * this iterator preserves the columnar format for zero-copy batch processing.
+ *
+ *
Manages the lifecycle of:
+ *
+ * - BufferAllocator - Arrow memory management
+ * - LanceFileReader - Lance file handle
+ * - ArrowReader - Arrow batch reader
+ *
+ */
+public class LanceBatchIterator implements Iterator, Closeable {
+ private final BufferAllocator allocator;
+ private final LanceFileReader lanceReader;
+ private final ArrowReader arrowReader;
+ private final String path;
+
+ private ColumnVector[] columnVectors;
+ private ColumnarBatch currentBatch;
+ private boolean hasCachedBatch = false;
+ private boolean finished = false;
+ private boolean closed = false;
+
+ /**
+ * Creates a new Lance batch iterator.
+ *
+ * @param allocator Arrow buffer allocator for memory management
+ * @param lanceReader Lance file reader
+ * @param arrowReader Arrow reader for batch reading
+ * @param path File path (for error messages)
+ */
+ public LanceBatchIterator(BufferAllocator allocator,
+ LanceFileReader lanceReader,
+ ArrowReader arrowReader,
+ String path) {
+ this.allocator = Objects.requireNonNull(allocator, "allocator must not be null");
+ this.lanceReader = Objects.requireNonNull(lanceReader, "lanceReader must not be null");
+ this.arrowReader = Objects.requireNonNull(arrowReader, "arrowReader must not be null");
+ this.path = path;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (finished) {
+ return false;
+ }
+ if (hasCachedBatch) {
+ return true;
+ }
+
+ try {
+ if (arrowReader.loadNextBatch()) {
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ // Create column vector wrappers once and reuse across batches
+ // (ArrowReader reuses the same VectorSchemaRoot)
+ if (columnVectors == null) {
+ columnVectors = root.getFieldVectors().stream()
+ .map(LanceArrowColumnVector::new)
+ .toArray(ColumnVector[]::new);
+ }
+
+ currentBatch = new ColumnarBatch(columnVectors, root.getRowCount());
+ hasCachedBatch = true;
+ return true;
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to read next batch from Lance file: " + path, e);
+ }
+
+ finished = true;
+ return false;
+ }
+
+ @Override
+ public ColumnarBatch next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more batches available");
+ }
+ hasCachedBatch = false;
+ return currentBatch;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ Exception arrowException = null;
+ Exception lanceException = null;
+ Exception allocatorException = null;
+
+ // Don't close currentBatch here: ColumnarBatch.close() would close the
+ // underlying Arrow FieldVectors through LanceArrowColumnVector, but they
+ // are owned by the ArrowReader (via VectorSchemaRoot) and will be closed
+ // when arrowReader.close() is called below.
+ currentBatch = null;
+
+ try {
+ arrowReader.close();
+ } catch (Exception e) {
+ arrowException = e;
+ }
+
+ try {
+ lanceReader.close();
+ } catch (Exception e) {
+ lanceException = e;
+ }
+
+ try {
+ allocator.close();
+ } catch (Exception e) {
+ allocatorException = e;
+ }
+
+ // Propagate exceptions, attaching earlier ones as suppressed so nothing is silently lost.
+ if (allocatorException != null) {
+ if (arrowException != null) {
+ allocatorException.addSuppressed(arrowException);
+ }
+ if (lanceException != null) {
+ allocatorException.addSuppressed(lanceException);
+ }
+ throw new HoodieException("Failed to close Arrow allocator", allocatorException);
+ }
+ if (arrowException != null) {
+ if (lanceException != null) {
+ arrowException.addSuppressed(lanceException);
+ }
+ // Preserve the IOException-specific subclass when the underlying close raised IOException;
+ // fall back to HoodieException so RuntimeExceptions from arrowReader.close() are still surfaced
+ // (and don't strand the lanceReader/allocator close paths above).
+ if (arrowException instanceof IOException) {
+ throw new HoodieIOException("Failed to close Arrow reader", (IOException) arrowException);
+ } else {
+ throw new HoodieException("Failed to close Arrow reader", arrowException);
+ }
+ }
+ if (lanceException != null) {
+ throw new HoodieException("Failed to close Lance reader", lanceException);
+ }
+ }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
index 91a0421d01186..98ba44507dba6 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
@@ -115,10 +115,10 @@ public boolean hasNext() {
return true;
}
- if (currentBatch != null) {
- currentBatch.close();
- currentBatch = null;
- }
+ // Release reference to previous batch (don't close — the underlying
+ // FieldVectors are reused by ArrowReader across loadNextBatch() calls,
+ // and closing would corrupt the cached LanceArrowColumnVector wrappers).
+ currentBatch = null;
// Try to load next batch. Loop so zero-row batches (legitimately returned e.g. after
// filter pushdown) don't silently terminate iteration and drop subsequent non-empty batches.
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
index 65dec06cd3e00..c59ee115fb448 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
@@ -23,12 +23,15 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.common.util
-import org.apache.hudi.common.util.collection.ClosableIterator
+import org.apache.hudi.common.util.collection.{ClosableIterator, Pair => HoodiePair}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.io.memory.HoodieArrowAllocator
-import org.apache.hudi.io.storage.{BlobDescriptorTransform, LanceRecordIterator, VectorConversionUtils}
+import org.apache.hudi.io.storage.{BlobDescriptorTransform, LanceBatchIterator, LanceRecordIterator, VectorConversionUtils}
import org.apache.hudi.storage.StorageConfiguration
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.arrow.vector.FieldVector
+import org.apache.arrow.vector.ipc.ArrowReader
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.schema.MessageType
import org.apache.spark.TaskContext
@@ -36,24 +39,31 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumnarFileReader, SparkSchemaTransformUtils}
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.sql.util.LanceArrowUtils
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.lance.file.{BlobReadMode, FileReadOptions, LanceFileReader}
+import org.lance.spark.vectorized.LanceArrowColumnVector
-import java.io.IOException
+import java.io.{Closeable, IOException}
import scala.collection.JavaConverters._
/**
* Reader for Lance files in Spark datasource.
- * Implements vectorized reading using LanceArrowColumnVector.
+ * Supports both row-based and columnar batch reading modes.
*
- * @param enableVectorizedReader whether to use vectorized reading (currently always true for Lance)
+ * @param enableVectorizedReader when true, returns ColumnarBatch for vectorized processing;
+ * when false, returns InternalRow one by one
*/
class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumnarFileReader {
+ /** Holds a pre-created all-null Arrow vector for a column missing from the file (schema evolution). */
+ private case class NullColumnEntry(lanceColumnVector: LanceArrowColumnVector, arrowVector: FieldVector)
+
// Batch size for reading Lance files (number of rows per batch)
private val DEFAULT_BATCH_SIZE = 512
@@ -93,9 +103,11 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna
val allocator = HoodieArrowAllocator.newChildAllocator(
getClass.getSimpleName + "-data-" + filePath, dataAllocatorSize)
+ var lanceReader: LanceFileReader = null
+ var arrowReader: ArrowReader = null
try {
// Open Lance file reader
- val lanceReader = LanceFileReader.open(filePath, allocator)
+ lanceReader = LanceFileReader.open(filePath, allocator)
// Get schema from Lance file. lance-spark strips Hudi's VECTOR descriptor during
// Arrow→Spark conversion but keeps the fixed-size-list dimension on the Spark
@@ -137,69 +149,62 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna
// the option regardless.
val blobMode = resolveBlobReadMode(storageConf)
val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build()
- val arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE, readOpts)
+ arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE, readOpts)
- // Compose the DESCRIPTOR-aware blob transform only when the user opted into that mode
- // AND the request actually has BLOB columns (otherwise the rewrite has nothing to do).
+ // BLOB columns in the request — needed both for the DESCRIPTOR-mode row-path transform
+ // and for the path-selection gate below.
val blobFieldNames: java.util.Set[String] =
iteratorSchema.fields.collect { case f if isBlobField(f) => f.name }.toSet.asJava
- val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty) {
- new BlobDescriptorTransform(blobFieldNames, filePath)
+
+ // Decide between batch mode and row mode.
+ // Fall back to row mode if:
+ // - type casting is needed (batch-level type casting deferred to follow-up), OR
+ // - the partition schema contains a type the batch-mode partition-vector populator
+ // does not handle (Struct/Array/Map/Char/Varchar/interval, etc.). The row path
+ // preserves these via JoinedRow, so falling back avoids silently nulling them out, OR
+ // - DESCRIPTOR blob mode is requested with BLOB columns present. The descriptor
+ // rewrite (data→null + synthesized/passthrough reference) lives in BlobDescriptorTransform
+ // which only the row path applies; the batch path would surface Lance's raw
+ // {position, size} struct in `data`, breaking the DESCRIPTOR-mode contract.
+ val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+ val partitionTypesBatchSupported =
+ partitionSchema.forall(f => isPartitionTypeSupportedForBatch(f.dataType))
+ val descriptorBlobReadNeedsRowPath =
+ blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty
+ if (enableVectorizedReader && !hasTypeChanges && partitionTypesBatchSupported
+ && !descriptorBlobReadNeedsRowPath) {
+ readBatch(file, allocator, lanceReader, arrowReader, filePath,
+ requestSchema, requiredSchema, partitionSchema)
} else {
- null
- }
- lanceIterator = new LanceRecordIterator(
- allocator, lanceReader, arrowReader, iteratorSchema, filePath, blobTransform)
+ // Compose the DESCRIPTOR-aware blob transform only when the user opted into that mode
+ // AND the request actually has BLOB columns (otherwise the rewrite has nothing to do).
+ val blobTransform = if (descriptorBlobReadNeedsRowPath) {
+ new BlobDescriptorTransform(blobFieldNames, filePath)
+ } else {
+ null
+ }
+ val recordIterator = new LanceRecordIterator(
+ allocator, lanceReader, arrowReader, iteratorSchema, filePath, blobTransform)
+ lanceIterator = recordIterator
- // Register cleanup listener
- Option(TaskContext.get()).foreach { ctx =>
- ctx.addTaskCompletionListener[Unit](_ => lanceIterator.close())
- }
+ // Register cleanup listener
+ Option(TaskContext.get()).foreach { ctx =>
+ ctx.addTaskCompletionListener[Unit](_ => recordIterator.close())
+ }
- val baseIter: Iterator[InternalRow] = lanceIterator.asScala
-
- // Create the following projections for schema evolution:
- // 1. Padding projection: add NULL for missing columns
- // 2. Casting projection: handle type conversions
- val schemaUtils = sparkAdapter.getSchemaUtils
- val paddingProj = SparkSchemaTransformUtils.generateNullPaddingProjection(iteratorSchema, requiredSchema)
- val castProj = SparkSchemaTransformUtils.generateUnsafeProjection(
- schemaUtils.toAttributes(requiredSchema),
- Some(SQLConf.get.sessionLocalTimeZone),
- implicitTypeChangeInfo,
- requiredSchema,
- new StructType(),
- schemaUtils
- )
-
- // Unify projections by applying padding and then casting for each row
- val projection: UnsafeProjection = new UnsafeProjection {
- def apply(row: InternalRow): UnsafeRow =
- castProj(paddingProj(row))
- }
- val projectedIter = baseIter.map(projection.apply)
-
- // Handle partition columns
- if (partitionSchema.length == 0) {
- // No partition columns - return rows directly
- projectedIter
- } else {
- // Create UnsafeProjection to convert JoinedRow to UnsafeRow
- val fullSchema = (requiredSchema.fields ++ partitionSchema.fields).map(f =>
- AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
- val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-
- // Append partition values to each row using JoinedRow, then convert to UnsafeRow
- val joinedRow = new JoinedRow()
- projectedIter.map(row => unsafeProjection(joinedRow(row, file.partitionValues)))
+ readRows(file, recordIterator, iteratorSchema, requiredSchema, partitionSchema, implicitTypeChangeInfo)
}
} catch {
case e: Exception =>
if (lanceIterator != null) {
- lanceIterator.close() // Close iterator which handles lifecycle for all objects
+ // Iterator owns allocator/lanceReader/arrowReader — let it close everything.
+ try { lanceIterator.close() } catch { case s: Exception => e.addSuppressed(s) }
} else {
- allocator.close() // Close allocator directly
+ // Iterator wasn't constructed yet; close whatever resources were opened, in reverse order.
+ try { if (arrowReader != null) arrowReader.close() } catch { case s: Exception => e.addSuppressed(s) }
+ try { if (lanceReader != null) lanceReader.close() } catch { case s: Exception => e.addSuppressed(s) }
+ try { allocator.close() } catch { case s: Exception => e.addSuppressed(s) }
}
throw new IOException(s"Failed to read Lance file: $filePath", e)
}
@@ -262,4 +267,310 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna
valueContainsNull = true)
case other => other
}
+
+ /**
+ * Columnar batch reading path. Returns Iterator[ColumnarBatch] type-erased as Iterator[InternalRow].
+ * Used when enableVectorizedReader=true and no type casting is needed.
+ */
+ private def readBatch(file: PartitionedFile,
+ allocator: BufferAllocator,
+ lanceReader: LanceFileReader,
+ arrowReader: ArrowReader,
+ filePath: String,
+ requestSchema: StructType,
+ requiredSchema: StructType,
+ partitionSchema: StructType): Iterator[InternalRow] = {
+
+ val batchIterator = new LanceBatchIterator(allocator, lanceReader, arrowReader, filePath)
+
+ // Build column mapping: for each column in requiredSchema, find its index in requestSchema (file columns)
+ // Returns -1 if the column is missing from the file (schema evolution: column addition)
+ val columnMapping: Array[Int] = requiredSchema.fields.map { field =>
+ requestSchema.fieldNames.indexOf(field.name)
+ }
+
+ // Create Arrow-backed null vectors for columns missing from the file.
+ // Uses LanceArrowColumnVector so that Spark's vectorTypes() contract is satisfied
+ // (FileSourceScanExec expects all data columns to be LanceArrowColumnVector).
+ val nullAllocator: Option[BufferAllocator] = if (columnMapping.contains(-1)) {
+ Some(HoodieArrowAllocator.newChildAllocator(
+ getClass.getSimpleName + "-null-" + filePath,
+ HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES.defaultValue().toLong))
+ } else None
+
+ // Sparse array indexed by required-schema position so the per-batch hot loop is O(1).
+ // Slots stay null for columns that come from the file (i.e. columnMapping(i) >= 0).
+ // Arrow vectors auto-reallocate on setValueCount (see BaseFixedWidthVector.setValueCount),
+ // so it is safe to call setValueCount with a count larger than DEFAULT_BATCH_SIZE.
+ val nullColumnByIndex: Array[NullColumnEntry] =
+ nullAllocator.map { alloc =>
+ val arr = new Array[NullColumnEntry](requiredSchema.length)
+ columnMapping.zipWithIndex.filter(_._1 < 0).foreach { case (_, idx) =>
+ val field = LanceArrowUtils.toArrowField(
+ requiredSchema(idx).name, requiredSchema(idx).dataType, requiredSchema(idx).nullable, "UTC")
+ val arrowVector = field.createVector(alloc)
+ arrowVector.allocateNew()
+ arrowVector.setValueCount(DEFAULT_BATCH_SIZE)
+ arr(idx) = NullColumnEntry(new LanceArrowColumnVector(arrowVector), arrowVector)
+ }
+ arr
+ }.getOrElse(Array.empty)
+
+ // Pre-create partition column vectors (reused across batches, reset per batch)
+ val hasPartitionColumns = partitionSchema.length > 0
+ val partitionVectors: Array[WritableColumnVector] = if (hasPartitionColumns) {
+ partitionSchema.fields.map(f => new OnHeapColumnVector(DEFAULT_BATCH_SIZE, f.dataType))
+ } else {
+ Array.empty
+ }
+
+ // Populate partition vectors with constant values
+ var lastPopulatedNumRows = DEFAULT_BATCH_SIZE
+ if (hasPartitionColumns) {
+ populatePartitionVectors(partitionVectors, partitionSchema, file.partitionValues, DEFAULT_BATCH_SIZE)
+ }
+
+ val totalColumns = requiredSchema.length + partitionSchema.length
+
+ // Map each source batch to a batch with the correct column layout.
+ val mappedIterator = new Iterator[ColumnarBatch] with Closeable {
+ private[this] var closed = false
+
+ override def hasNext: Boolean = batchIterator.hasNext()
+
+ override def next(): ColumnarBatch = {
+ val sourceBatch = batchIterator.next()
+ val numRows = sourceBatch.numRows()
+
+ val vectors = new Array[ColumnVector](totalColumns)
+
+ // Data columns: reorder from source batch or substitute null Arrow vector
+ var i = 0
+ while (i < requiredSchema.length) {
+ if (columnMapping(i) >= 0) {
+ vectors(i) = sourceBatch.column(columnMapping(i))
+ } else {
+ // Direct-indexed lookup (O(1)) for the pre-created null vector for this column.
+ val entry = nullColumnByIndex(i)
+ if (entry == null) {
+ throw new IllegalStateException(s"No null vector pre-created for column index $i")
+ }
+ // Adjust valueCount if batch size differs from allocated size
+ if (numRows != entry.arrowVector.getValueCount) {
+ entry.arrowVector.setValueCount(numRows)
+ }
+ vectors(i) = entry.lanceColumnVector
+ }
+ i += 1
+ }
+
+ // Partition columns: constant vectors
+ if (hasPartitionColumns) {
+ if (numRows != lastPopulatedNumRows) {
+ populatePartitionVectors(partitionVectors, partitionSchema, file.partitionValues, numRows)
+ lastPopulatedNumRows = numRows
+ }
+ var j = 0
+ while (j < partitionSchema.length) {
+ vectors(requiredSchema.length + j) = partitionVectors(j)
+ j += 1
+ }
+ }
+
+ val result = new ColumnarBatch(vectors)
+ result.setNumRows(numRows)
+ result
+ }
+
+ override def close(): Unit = {
+ // Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close().
+ if (!closed) {
+ closed = true
+ // Attempt every close; if one throws, keep going and surface the first error with the
+ // rest attached as suppressed, so one failure cannot strand Arrow/Lance resources.
+ var closeError: Exception = null
+ def closeSafely(f: => Unit): Unit = {
+ try {
+ f
+ } catch {
+ case e: Exception =>
+ if (closeError == null) closeError = e
+ else closeError.addSuppressed(e)
+ }
+ }
+ // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator).
+ // nullColumnByIndex is sparse: skip slots for columns that came from the file.
+ var k = 0
+ while (k < nullColumnByIndex.length) {
+ val entry = nullColumnByIndex(k)
+ if (entry != null) closeSafely(entry.lanceColumnVector.close())
+ k += 1
+ }
+ nullAllocator.foreach(a => closeSafely(a.close()))
+ closeSafely(batchIterator.close())
+ partitionVectors.foreach(v => closeSafely(v.close()))
+ if (closeError != null) throw closeError
+ }
+ }
+ }
+
+ // Register cleanup listener
+ Option(TaskContext.get()).foreach { ctx =>
+ ctx.addTaskCompletionListener[Unit](_ => mappedIterator.close())
+ }
+
+ mappedIterator.asInstanceOf[Iterator[InternalRow]]
+ }
+
+ /**
+ * Row-based reading path. Consumes the pre-built [[LanceRecordIterator]] (already wired up
+ * with blob transform if applicable) and applies schema-evolution projections + partition handling.
+ */
+ private def readRows(file: PartitionedFile,
+ recordIterator: LanceRecordIterator,
+ iteratorSchema: StructType,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ implicitTypeChangeInfo: java.util.Map[Integer, HoodiePair[DataType, DataType]]): Iterator[InternalRow] = {
+
+ // Create the following projections for schema evolution:
+ // 1. Padding projection: add NULL for missing columns
+ // 2. Casting projection: handle type conversions
+ val schemaUtils = sparkAdapter.getSchemaUtils
+ val paddingProj = SparkSchemaTransformUtils.generateNullPaddingProjection(iteratorSchema, requiredSchema)
+ val castProj = SparkSchemaTransformUtils.generateUnsafeProjection(
+ schemaUtils.toAttributes(requiredSchema),
+ Some(SQLConf.get.sessionLocalTimeZone),
+ implicitTypeChangeInfo,
+ requiredSchema,
+ new StructType(),
+ schemaUtils
+ )
+
+ // Unify projections by applying padding and then casting for each row
+ val projection: UnsafeProjection = new UnsafeProjection {
+ def apply(row: InternalRow): UnsafeRow =
+ castProj(paddingProj(row))
+ }
+ // Compose the per-row projection once. With partition columns, chain the
+ // JoinedRow/UnsafeProjection after the padding+cast projection.
+ val rowProjection: InternalRow => InternalRow = if (partitionSchema.length == 0) {
+ projection.apply
+ } else {
+ val fullSchema = (requiredSchema.fields ++ partitionSchema.fields).map(f =>
+ AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+ val joinedRow = new JoinedRow()
+ row => unsafeProjection(joinedRow(projection(row), file.partitionValues))
+ }
+
+ // Wrap in a Closeable iterator so driver/direct-reader callers (TaskContext.get()
+ // is null) have an explicit cleanup handle. close() is idempotent and safe to
+ // run alongside the TaskContext completion listener above, since
+ // LanceRecordIterator.close() is itself idempotent.
+ val src = recordIterator.asScala
+ new Iterator[InternalRow] with Closeable {
+ private[this] var closed = false
+ override def hasNext: Boolean = src.hasNext
+ override def next(): InternalRow = rowProjection(src.next())
+ override def close(): Unit = {
+ if (!closed) {
+ closed = true
+ recordIterator.close()
+ }
+ }
+ }
+ }
+
+ /**
+ * Whether the batch-mode partition-vector populator can handle this partition type.
+ * Must stay in sync with the match in [[populatePartitionVectors]] — types not listed here
+ * (Struct/Array/Map/CharType/VarcharType/interval types, etc.) cause the read() path to fall
+ * back to row mode, where JoinedRow preserves the value as-is.
+ */
+ private def isPartitionTypeSupportedForBatch(dt: DataType): Boolean = dt match {
+ case BooleanType | ByteType | ShortType | IntegerType | DateType => true
+ case LongType | TimestampType | TimestampNTZType => true
+ case FloatType | DoubleType => true
+ case StringType | BinaryType => true
+ case _: DecimalType => true
+ case _ => false
+ }
+
+ /**
+ * Populate writable column vectors with constant partition values.
+ * Each vector is filled with the same value for all rows.
+ */
+ private def populatePartitionVectors(vectors: Array[WritableColumnVector],
+ partitionSchema: StructType,
+ partitionValues: InternalRow,
+ numRows: Int): Unit = {
+ var i = 0
+ while (i < partitionSchema.length) {
+ val vector = vectors(i)
+ vector.reset()
+ if (partitionValues.isNullAt(i)) {
+ vector.putNulls(0, numRows)
+ } else {
+ partitionSchema(i).dataType match {
+ case BooleanType =>
+ val v = partitionValues.getBoolean(i)
+ var j = 0
+ while (j < numRows) { vector.putBoolean(j, v); j += 1 }
+ case ByteType =>
+ val v = partitionValues.getByte(i)
+ var j = 0
+ while (j < numRows) { vector.putByte(j, v); j += 1 }
+ case ShortType =>
+ val v = partitionValues.getShort(i)
+ var j = 0
+ while (j < numRows) { vector.putShort(j, v); j += 1 }
+ case IntegerType | DateType =>
+ val v = partitionValues.getInt(i)
+ vector.putInts(0, numRows, v)
+ case LongType | TimestampType | TimestampNTZType =>
+ val v = partitionValues.getLong(i)
+ vector.putLongs(0, numRows, v)
+ case FloatType =>
+ val v = partitionValues.getFloat(i)
+ vector.putFloats(0, numRows, v)
+ case DoubleType =>
+ val v = partitionValues.getDouble(i)
+ vector.putDoubles(0, numRows, v)
+ case StringType =>
+ val v = partitionValues.getUTF8String(i)
+ val bytes = v.getBytes
+ var j = 0
+ while (j < numRows) { vector.putByteArray(j, bytes); j += 1 }
+ case d: DecimalType =>
+ val v = partitionValues.getDecimal(i, d.precision, d.scale)
+ if (d.precision <= Decimal.MAX_INT_DIGITS) {
+ val unscaled = v.toUnscaledLong.toInt
+ var j = 0
+ while (j < numRows) { vector.putInt(j, unscaled); j += 1 }
+ } else if (d.precision <= Decimal.MAX_LONG_DIGITS) {
+ val unscaled = v.toUnscaledLong
+ var j = 0
+ while (j < numRows) { vector.putLong(j, unscaled); j += 1 }
+ } else {
+ val bytes = v.toJavaBigDecimal.unscaledValue().toByteArray
+ var j = 0
+ while (j < numRows) { vector.putByteArray(j, bytes); j += 1 }
+ }
+ case BinaryType =>
+ val v = partitionValues.getBinary(i)
+ var j = 0
+ while (j < numRows) { vector.putByteArray(j, v); j += 1 }
+ case other =>
+ // Unreachable: read() gates batch mode on isPartitionTypeSupportedForBatch so unsupported
+ // partition types are routed to the row path. Throw loudly if that gate ever regresses
+ // rather than silently nulling out the user's partition values.
+ throw new IllegalStateException(
+ s"Unsupported partition type for Lance batch mode: $other (column ${partitionSchema(i).name}). " +
+ "isPartitionTypeSupportedForBatch must be kept in sync with populatePartitionVectors.")
+ }
+ }
+ i += 1
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 3da22ff8ebe7e..1d711875cfa30 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieFileFormat
-import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.common.schema.HoodieSchemaUtils
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, ParquetTableSchemaResolver}
import org.apache.hudi.common.table.read.HoodieFileGroupReader
@@ -58,6 +58,7 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils}
import org.apache.spark.util.SerializableConfiguration
+import org.lance.spark.vectorized.LanceArrowColumnVector
import java.io.Closeable
@@ -133,6 +134,24 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
}
}
+ /**
+ * Whether the requested schema contains any top-level BLOB columns. Used to disable
+ * Lance batch mode for BLOB tables: the DESCRIPTOR-mode rewrite (and the OUT_OF_LINE
+ * data→null contract) lives only in the row-path BlobDescriptorTransform, and `supportBatch`
+ * cannot inspect read-time options (e.g. `hoodie.read.blob.inline.mode`) since it runs at
+ * planning time. Forcing row mode whenever BLOB columns are present is the simplest correct
+ * gate — BLOB processing is per-row anyway (lazy byte materialization) so the perf delta is
+ * negligible.
+ */
+ private def schemaContainsBlobColumn(schema: StructType): Boolean = {
+ schema.fields.exists { f =>
+ val md = f.metadata
+ md != null && md.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
+ HoodieSchema.parseTypeDescriptor(md.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ .getType == HoodieSchemaType.BLOB
+ }
+ }
+
/**
* Checks if the file format supports vectorized reading, please refer to SPARK-40918.
*
@@ -157,8 +176,17 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
val orcBatchSupported = conf.orcVectorizedReaderEnabled &&
schema.forall(s => OrcUtils.supportColumnarReads(
s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
- // TODO: Implement columnar batch reading https://github.com/apache/hudi/issues/17736
- val lanceBatchSupported = false
+ // Lance batch mode cannot honor either of these scenarios — they both force a runtime
+ // row path inside SparkLanceReaderBase.read:
+ // - `hoodie.read.blob.inline.mode=DESCRIPTOR`: the BLOB rewrite lives in
+ // BlobDescriptorTransform on the row path only.
+ // - schema-on-read with implicit type changes: SparkSchemaTransformUtils builds the
+ // cast projection on the row path only; the batch path has no equivalent.
+ // Disable batch at plan time whenever either trigger could fire so the planner doesn't
+ // commit to ColumnarBatch output that the row-path reader cannot deliver — Spark would
+ // wrap the scan with ColumnarToRowExec and ClassCastException on the first InternalRow.
+ val lanceBatchSupported =
+ !schemaContainsBlobColumn(schema) && !internalSchemaOpt.isPresent
val supportBatch = if (isMultipleBaseFileFormatsEnabled) {
parquetBatchSupported && orcBatchSupported
@@ -184,24 +212,44 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
override def vectorTypes(requiredSchema: StructType,
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]] = {
- val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf)
- if (mandatoryFields.isEmpty) {
- originalVectorTypes
- } else {
- val regularVectorType = if (!sqlConf.offHeapColumnVectorEnabled) {
- classOf[OnHeapColumnVector].getName
+ if (hoodieFileFormat == HoodieFileFormat.LANCE && !isMultipleBaseFileFormatsEnabled) {
+ // Lance uses LanceArrowColumnVector for data columns and OnHeapColumnVector for partition columns.
+ // Spark uses vectorTypes to determine if columnar batch reading is supported.
+ val lanceVectorType = classOf[LanceArrowColumnVector].getName
+ val partitionVectorType = classOf[OnHeapColumnVector].getName
+ val baseTypes = Seq.fill(requiredSchema.length)(lanceVectorType) ++ Seq.fill(partitionSchema.length)(partitionVectorType)
+ if (mandatoryFields.isEmpty) {
+ Some(baseTypes)
} else {
- classOf[OffHeapColumnVector].getName
- }
- originalVectorTypes.map {
- o: Seq[String] => o.zipWithIndex.map(a => {
- if (a._2 >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(a._2 - requiredSchema.length).name)) {
- regularVectorType
+ // mandatoryFields are partition columns read from the file — use LanceArrowColumnVector for them
+ Some(baseTypes.zipWithIndex.map { case (vt, i) =>
+ if (i >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(i - requiredSchema.length).name)) {
+ lanceVectorType
} else {
- a._1
+ vt
}
})
}
+ } else {
+ val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf)
+ if (mandatoryFields.isEmpty) {
+ originalVectorTypes
+ } else {
+ val regularVectorType = if (!sqlConf.offHeapColumnVectorEnabled) {
+ classOf[OnHeapColumnVector].getName
+ } else {
+ classOf[OffHeapColumnVector].getName
+ }
+ originalVectorTypes.map {
+ o: Seq[String] => o.zipWithIndex.map(a => {
+ if (a._2 >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(a._2 - requiredSchema.length).name)) {
+ regularVectorType
+ } else {
+ a._1
+ }
+ })
+ }
+ }
}
}
@@ -265,6 +313,15 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
} else {
baseFileReader
}
+ // For base-file-only reads (case _ branches), we use the vectorized reader for formats that support
+ // returning InternalRow even in vectorized mode (Parquet with returningBatch=false).
+ // Lance's vectorized reader always returns ColumnarBatch, so for MOR Lance tables (where
+ // supportReturningBatch=false and Spark expects InternalRow), we must use the non-vectorized reader.
+ val baseFileOnlyReader = if (isMOR && hoodieFileFormat == HoodieFileFormat.LANCE && !isMultipleBaseFileFormatsEnabled) {
+ fileGroupBaseFileReader
+ } else {
+ baseFileReader
+ }
val broadcastedStorageConf = spark.sparkContext.broadcast(new SerializableConfiguration(augmentedStorageConf.unwrap()))
val fileIndexProps: TypedProperties = HoodieFileIndex.getConfigProperties(spark, options, null)
@@ -319,7 +376,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
fixedPartitionIndexes)
case _ =>
- readBaseFile(file, baseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes,
+ readBaseFile(file, baseFileOnlyReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes,
requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf)
}
// CDC queries.
@@ -327,7 +384,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, fileGroupBaseFileReader.value, storageConf, fileIndexProps, requiredSchema, metaClient)
case _ =>
- readBaseFile(file, baseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes,
+ readBaseFile(file, baseFileOnlyReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes,
requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf)
}
CloseableIteratorListener.addListener(iter)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala
new file mode 100644
index 0000000000000..646bc9eeea0fa
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.DefaultSparkRecordMerger
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.hudi.client.SparkTaskContextSupplier
+import org.apache.hudi.common.bloom.{BloomFilterFactory, BloomFilterTypeCode}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.io.storage.HoodieSparkLanceWriter
+import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
+import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Tests for Lance columnar batch (vectorized) reading, covering:
+ *
+ * Unit-level tests (invoke [[SparkLanceReaderBase]] directly):
+ * - Row-based path: `enableVectorizedReader=false` must yield [[InternalRow]], never [[ColumnarBatch]]
+ * - Columnar path: `enableVectorizedReader=true` with matching schemas yields [[ColumnarBatch]]
+ * - Null-padding: columns absent from the file are null-padded in the [[ColumnarBatch]]
+ * - Partition vectors: partition values are appended as constant columns to each [[ColumnarBatch]]
+ * - Fallback: implicit type change (e.g. FLOAT → DOUBLE) forces the row path
+ *
+ * Integration tests (Spark SQL / DataFrame API):
+ * - COW table round-trip via `spark.read.format("hudi")` with vectorized reads active
+ * - Schema evolution: adding a column via bulk_insert; old base files are null-padded
+ * - SQL query: `SELECT` with a `WHERE` clause on a COW table
+ */
+@DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true")
+class TestLanceColumnarBatch extends HoodieSparkClientTestBase {
+
+ var spark: SparkSession = _
+
+ @BeforeEach
+ override def setUp(): Unit = {
+ super.setUp()
+ spark = sqlContext.sparkSession
+ }
+
+ @AfterEach
+ override def tearDown(): Unit = {
+ super.tearDown()
+ spark = null
+ }
+
+ /**
+ * Write a plain Lance file (no Hudi metadata fields) containing [[rows]] with [[schema]].
+ * Returns the absolute path string of the written file.
+ */
+ private def writeLanceFile(fileName: String, schema: StructType, rows: Seq[InternalRow]): String = {
+ val bloom = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, BloomFilterTypeCode.SIMPLE.name())
+ val path = new StoragePath(s"$basePath/$fileName")
+ val storage = HoodieTestUtils.getStorage(basePath)
+ try {
+ val writer = HoodieSparkLanceWriter.builder
+ .file(path).sparkSchema(schema)
+ .instantTime("20240101120000000")
+ .taskContextSupplier(new SparkTaskContextSupplier())
+ .storage(storage)
+ .populateMetaFields(false)
+ .bloomFilterOpt(HOption.of(bloom)).build
+ try {
+ rows.zipWithIndex.foreach { case (row, i) => writer.writeRow("key" + i, row) }
+ } finally {
+ writer.close()
+ }
+ } finally {
+ storage.close()
+ }
+ path.toString
+ }
+
+ private def extractRow(row: InternalRow, schema: StructType): (Int, String, Any) = {
+ val id = row.getInt(0)
+ val name = if (row.isNullAt(1)) null else row.getUTF8String(1).toString
+ val col2 = if (schema.fields.length >= 3) {
+ val f = schema.fields(2)
+ if (row.isNullAt(2)) null
+ else f.dataType match {
+ case DoubleType => row.getDouble(2).asInstanceOf[AnyRef]
+ case FloatType => row.getFloat(2).asInstanceOf[AnyRef]
+ case IntegerType => row.getInt(2).asInstanceOf[AnyRef]
+ case LongType => row.getLong(2).asInstanceOf[AnyRef]
+ case StringType => row.getUTF8String(2).toString
+ case _ => row.get(2, f.dataType)
+ }
+ } else null
+ (id, name, col2)
+ }
+
+ /**
+ * Invoke [[SparkLanceReaderBase.read]] and consume the iterator, safely extracting data from
+ * each item before the iterator advances (which would invalidate Arrow-backed batch buffers).
+ *
+ * Returns a tuple of:
+ * - `batchCount` – number of [[ColumnarBatch]] items returned
+ * - `rowCount` – number of [[InternalRow]] items returned (non-batch path)
+ * - `rows` – collected (id: Int, name: String, col2: Any) tuples from both paths
+ */
+ private def readAndCollect(
+ filePath: String,
+ requiredSchema: StructType,
+ partitionSchema: StructType = new StructType(),
+ partitionValues: InternalRow = new GenericInternalRow(Array.empty[Any]),
+ enableVectorized: Boolean = true
+ ): (Int, Int, Seq[(Int, String, Any)]) = {
+
+ val pf = sparkAdapter.getSparkPartitionedFileUtils
+ .createPartitionedFile(partitionValues, new StoragePath(filePath), 0L, Long.MaxValue)
+ val storageConf = new HadoopStorageConfiguration(new Configuration())
+ val reader = new SparkLanceReaderBase(enableVectorized)
+
+ var batchCount = 0
+ var rowCount = 0
+ val rows = ArrayBuffer.empty[(Int, String, Any)]
+
+ // Cast to Iterator[Any] to avoid JVM checkcast to InternalRow.
+ // The columnar batch path returns ColumnarBatch via type erasure (Iterator[InternalRow]
+ // that actually contains ColumnarBatch). Both iter.foreach and iter.next() on a typed
+ // Iterator[InternalRow] insert a checkcast that fails for ColumnarBatch.
+ val iter = reader.read(pf, requiredSchema, partitionSchema, HOption.empty(), Seq.empty[Filter], storageConf)
+ .asInstanceOf[Iterator[Any]]
+
+ try {
+ while (iter.hasNext) {
+ iter.next() match {
+ case batch: ColumnarBatch =>
+ batchCount += 1
+ batch.rowIterator().asScala.foreach { row =>
+ rows += extractRow(row, requiredSchema)
+ }
+
+ case row: InternalRow =>
+ rowCount += 1
+ rows += extractRow(row, requiredSchema)
+ }
+ }
+ } finally {
+ // On the driver in unit tests TaskContext.get() is null, so the
+ // completion-listener cleanup path inside readBatch() never fires.
+ // Explicitly close to release Arrow allocators and file readers.
+ iter match {
+ case c: AutoCloseable => c.close()
+ case _ => // row path returns a plain Iterator
+ }
+ }
+
+ (batchCount, rowCount, rows.toSeq)
+ }
+
+ /**
+ * Assert that the executed plan for `df` used a columnar (vectorized) scan and
+ * that the scan's `numOutputRows` SQLMetric is populated. This guards against a
+ * silent fallback to the row reader that would still produce correct output.
+ *
+ * The metric is lazy: the caller must have already forced execution (e.g. via
+ * `df.collect()`) before invoking this helper.
+ */
+ private def assertLanceVectorizedScan(df: DataFrame, expectedMinRows: Long): Unit = {
+ val rootPlan = df.queryExecution.executedPlan
+
+ def allNodes(p: SparkPlan): Seq[SparkPlan] = {
+ val hidden: Seq[SparkPlan] = p match {
+ case aqe: AdaptiveSparkPlanExec => allNodes(aqe.executedPlan)
+ case qs: QueryStageExec => allNodes(qs.plan)
+ case _ => Seq.empty
+ }
+ (p +: p.children.flatMap(allNodes)) ++ hidden
+ }
+
+ val nodes = allNodes(rootPlan)
+ val scanOpt = nodes.collectFirst {
+ case s: SparkPlan if s.supportsColumnar &&
+ (s.getClass.getSimpleName.contains("FileSourceScan") ||
+ s.getClass.getSimpleName.contains("BatchScan")) => s
+ }
+ assertTrue(scanOpt.isDefined,
+ s"Expected a columnar (supportsColumnar=true) scan node in executed plan, found none:\n${rootPlan.treeString}")
+ val scan = scanOpt.get
+ val numOutputRows = scan.metrics.get("numOutputRows").map(_.value).getOrElse(-1L)
+ assertTrue(numOutputRows >= expectedMinRows,
+ s"Columnar Lance scan produced $numOutputRows rows, expected >= $expectedMinRows; plan:\n${rootPlan.treeString}")
+ }
+
+ private val baseSchema: StructType = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType, nullable = true)
+ .add("score", DoubleType, nullable = true)
+
+ private def baseRows: Seq[InternalRow] = Seq(
+ new GenericInternalRow(Array[Any](1, UTF8String.fromString("Alice"), 95.5d)),
+ new GenericInternalRow(Array[Any](2, UTF8String.fromString("Bob"), 87.3d)),
+ new GenericInternalRow(Array[Any](3, UTF8String.fromString("Charlie"), 92.1d))
+ )
+
+ private def writeHudiTable(tableType: HoodieTableType,
+ tableName: String,
+ tablePath: String,
+ df: DataFrame,
+ saveMode: SaveMode = SaveMode.Append,
+ operation: Option[String] = None,
+ extraOptions: Map[String, String] = Map.empty): Unit = {
+ var writer = df.write
+ .format("hudi")
+ .option("hoodie.table.base.file.format", "LANCE")
+ .option(TABLE_TYPE.key(), tableType.name())
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "id")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName)
+ operation.foreach(op => writer = writer.option(OPERATION.key(), op))
+ extraOptions.foreach { case (k, v) => writer = writer.option(k, v) }
+ writer.mode(saveMode).save(tablePath)
+ }
+
+ // ===========================================================================
+ // Unit tests: SparkLanceReaderBase
+ // ===========================================================================
+
+ /**
+ * Row path (`enableVectorizedReader=false`) must return [[InternalRow]] instances, never
+ * [[ColumnarBatch]]. Data values must survive the row-level UnsafeProjection round-trip.
+ */
+ @Test
+ def testRowPathReturnsInternalRows(): Unit = {
+ val filePath = writeLanceFile("row_path.lance", baseSchema, baseRows)
+ val (batches, rowItems, rows) = readAndCollect(filePath, baseSchema, enableVectorized = false)
+
+ assertEquals(0, batches, "Row path must not yield ColumnarBatch")
+ assertEquals(3, rowItems, "Row path must yield one InternalRow per record")
+
+ assertEquals(List(1, 2, 3), rows.map(_._1))
+ assertEquals(List("Alice", "Bob", "Charlie"), rows.map(_._2))
+ assertEquals(List(95.5d, 87.3d, 92.1d), rows.map(_._3))
+ }
+
+ /**
+ * Columnar path (`enableVectorizedReader=true`, no type changes) must return
+ * [[ColumnarBatch]] instances backed by [[org.lance.spark.vectorized.LanceArrowColumnVector]].
+ * No [[InternalRow]] may be returned directly. Data must round-trip correctly.
+ */
+ @Test
+ def testColumnarPathReturnsBatches(): Unit = {
+ val filePath = writeLanceFile("columnar_path.lance", baseSchema, baseRows)
+ val (batches, rowItems, rows) = readAndCollect(filePath, baseSchema, enableVectorized = true)
+
+ assertTrue(batches > 0, "Vectorized path must yield at least one ColumnarBatch")
+ assertEquals(0, rowItems, "Vectorized path must not yield raw InternalRow")
+
+ assertEquals(3, rows.size)
+ assertEquals(List(1, 2, 3), rows.map(_._1))
+ assertEquals(List("Alice", "Bob", "Charlie"), rows.map(_._2))
+ assertEquals(List(95.5d, 87.3d, 92.1d), rows.map(_._3))
+ }
+
+ /**
+ * When the required schema contains a column that is absent from the file (schema evolution:
+ * column addition), the vectorized path must null-pad that column in each [[ColumnarBatch]]
+ * using a [[org.lance.spark.vectorized.LanceArrowColumnVector]] backed by an all-null
+ * Arrow vector.
+ */
+ @Test
+ def testColumnarPathNullPadsAbsentColumns(): Unit = {
+ val fileSchema = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType, nullable = true)
+ val rows = Seq(
+ new GenericInternalRow(Array[Any](1, UTF8String.fromString("Alice"))),
+ new GenericInternalRow(Array[Any](2, UTF8String.fromString("Bob")))
+ )
+ val filePath = writeLanceFile("null_pad.lance", fileSchema, rows)
+
+ // requiredSchema adds 'score' which does not exist in the file
+ val requiredSchemaWithExtra = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType, nullable = true)
+ .add("score", DoubleType, nullable = true)
+
+ val pf = sparkAdapter.getSparkPartitionedFileUtils
+ .createPartitionedFile(new GenericInternalRow(Array.empty[Any]),
+ new StoragePath(filePath), 0L, Long.MaxValue)
+ val storageConf = new HadoopStorageConfiguration(new Configuration())
+ val reader = new SparkLanceReaderBase(enableVectorizedReader = true)
+
+ var batchesSeen = 0
+ val nullPadded = ArrayBuffer.empty[Boolean]
+
+ val iter = reader.read(pf, requiredSchemaWithExtra, new StructType(), HOption.empty(),
+ Seq.empty[Filter], storageConf).asInstanceOf[Iterator[Any]]
+ try {
+ while (iter.hasNext) {
+ iter.next() match {
+ case batch: ColumnarBatch =>
+ batchesSeen += 1
+ batch.rowIterator().asScala.foreach { row =>
+ nullPadded += row.isNullAt(2)
+ }
+ case _ =>
+ fail("Vectorized path with null-padding must return ColumnarBatch")
+ }
+ }
+ } finally {
+ iter match {
+ case c: AutoCloseable => c.close()
+ case _ =>
+ }
+ }
+
+ assertTrue(batchesSeen > 0, "Must have produced at least one ColumnarBatch")
+ assertEquals(2, nullPadded.size, "Must have read 2 rows")
+ assertTrue(nullPadded.forall(identity), "Absent column 'score' must be null in every row")
+ }
+
+ /**
+ * When a non-empty `partitionSchema` is supplied, the vectorized path must append constant
+ * partition-value columns to each [[ColumnarBatch]] using
+ * [[org.apache.spark.sql.execution.vectorized.OnHeapColumnVector]].
+ *
+ * The resulting rows must have `dataSchema.size + partitionSchema.size` fields, with the
+ * partition value in the last position.
+ */
+ @Test
+ def testColumnarPathAppendsPartitionVectors(): Unit = {
+ val filePath = writeLanceFile("partitioned.lance", baseSchema, baseRows)
+
+ val partitionSchema = new StructType().add("dept", StringType, nullable = true)
+ val partitionValues = new GenericInternalRow(
+ Array[Any](UTF8String.fromString("engineering")))
+
+ val pf = sparkAdapter.getSparkPartitionedFileUtils
+ .createPartitionedFile(partitionValues, new StoragePath(filePath), 0L, Long.MaxValue)
+ val storageConf = new HadoopStorageConfiguration(new Configuration())
+ val reader = new SparkLanceReaderBase(enableVectorizedReader = true)
+
+ var batchesSeen = 0
+ val depts = ArrayBuffer.empty[String]
+
+ val iter = reader.read(pf, baseSchema, partitionSchema, HOption.empty(),
+ Seq.empty[Filter], storageConf).asInstanceOf[Iterator[Any]]
+ try {
+ while (iter.hasNext) {
+ iter.next() match {
+ case batch: ColumnarBatch =>
+ batchesSeen += 1
+ assertEquals(baseSchema.size + partitionSchema.size, batch.numCols(),
+ "Batch column count must equal data + partition columns")
+ batch.rowIterator().asScala.foreach { row =>
+ depts += row.getUTF8String(baseSchema.size).toString
+ }
+ case _ =>
+ fail("Vectorized path must return ColumnarBatch when partitionSchema is present")
+ }
+ }
+ } finally {
+ iter match {
+ case c: AutoCloseable => c.close()
+ case _ =>
+ }
+ }
+
+ assertTrue(batchesSeen > 0)
+ assertEquals(3, depts.size)
+ assertTrue(depts.forall(_ == "engineering"),
+ "Every row must carry the constant partition value 'engineering'")
+ }
+
+ /**
+ * When [[SparkSchemaTransformUtils.buildImplicitSchemaChangeInfo]] detects an implicit type
+ * change (file has FLOAT, query requires DOUBLE), the columnar path must fall back to the
+ * row path. The row path applies a cast projection so the returned values must be correct
+ * DOUBLE values even though the file stores FLOAT.
+ */
+ @Test
+ def testTypeChangeFallsBackToRowPath(): Unit = {
+ val fileSchema = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType, nullable = true)
+ .add("score", FloatType, nullable = true)
+
+ val fileRows = Seq(
+ new GenericInternalRow(Array[Any](1, UTF8String.fromString("Alice"), 95.5f)),
+ new GenericInternalRow(Array[Any](2, UTF8String.fromString("Bob"), 87.3f))
+ )
+ val filePath = writeLanceFile("type_change.lance", fileSchema, fileRows)
+
+ // requiredSchema uses DOUBLE for 'score' — implicit type change: FLOAT → DOUBLE
+ val requiredSchemaDoubleScore = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType, nullable = true)
+ .add("score", DoubleType, nullable = true)
+
+ val (batches, rowItems, rows) = readAndCollect(filePath, requiredSchemaDoubleScore, enableVectorized = true)
+
+ assertEquals(0, batches,
+ "Implicit type change FLOAT→DOUBLE must disable the columnar path and return InternalRow")
+ assertEquals(2, rowItems)
+
+ assertEquals(List(1, 2), rows.map(_._1))
+ assertEquals(List("Alice", "Bob"), rows.map(_._2))
+ // FLOAT 95.5f → cast via String → DOUBLE: result must be close to 95.5
+ assertEquals(95.5d, rows(0)._3.asInstanceOf[Double], 0.01d)
+ assertEquals(87.3d, rows(1)._3.asInstanceOf[Double], 0.01d)
+ }
+
+ // ===========================================================================
+ // Integration tests: Spark SQL / DataFrame API (COW only)
+ // ===========================================================================
+
+ /**
+ * End-to-end test for a COW Lance table read via `spark.read.format("hudi")`.
+ *
+ * With `lanceBatchSupported = true`, Spark will enable vectorized reads for Lance COW tables.
+ * This test verifies that data written via the Hudi DataFrame API is read back correctly when
+ * the vectorized code path is active.
+ */
+ @Test
+ def testCOWTableDataFrameRead(): Unit = {
+ val tableName = "test_lance_columnar_cow"
+ val tablePath = s"$basePath/$tableName"
+
+ val df = spark.createDataFrame(Seq(
+ (1, "Alice", 95.5d),
+ (2, "Bob", 87.3d),
+ (3, "Charlie", 92.1d)
+ )).toDF("id", "name", "score")
+
+ writeHudiTable(HoodieTableType.COPY_ON_WRITE, tableName, tablePath, df,
+ saveMode = SaveMode.Overwrite, operation = Some("bulk_insert"))
+
+ val actual = spark.read.format("hudi").load(tablePath)
+ .select("id", "name", "score")
+ .orderBy("id")
+
+ val expected = Seq(
+ (1, "Alice", 95.5d),
+ (2, "Bob", 87.3d),
+ (3, "Charlie", 92.1d)
+ )
+
+ val actualRows = actual.collect()
+ assertEquals(3, actualRows.length)
+ expected.zip(actualRows).foreach { case ((eid, ename, escore), row) =>
+ assertEquals(eid, row.getAs[Int]("id"))
+ assertEquals(ename, row.getAs[String]("name"))
+ assertEquals(escore, row.getAs[Double]("score"), 1e-6)
+ }
+
+ assertLanceVectorizedScan(actual, 3)
+ }
+
+ /**
+ * Schema evolution: a second bulk_insert writes records with a wider schema that adds a
+ * 'department' column. Old base files (from the first insert) do not contain 'department'.
+ *
+ * When reading the whole table with the full schema, `SparkLanceReaderBase` must null-pad
+ * 'department' for every row from the old base file, while the new base file has actual values.
+ */
+ @Test
+ def testCOWTableSchemaEvolutionNullPadding(): Unit = {
+ val tableName = "test_lance_schema_evolution"
+ val tablePath = s"$basePath/$tableName"
+
+ // First batch: narrow schema (id, name, score)
+ val df1 = spark.createDataFrame(Seq(
+ (1, "Alice", 95.5d),
+ (2, "Bob", 87.3d)
+ )).toDF("id", "name", "score")
+
+ writeHudiTable(HoodieTableType.COPY_ON_WRITE, tableName, tablePath, df1,
+ saveMode = SaveMode.Overwrite, operation = Some("bulk_insert"))
+
+ // Second batch: wider schema adds 'department'
+ val df2 = spark.createDataFrame(Seq(
+ (3, "Charlie", 92.1d, "engineering"),
+ (4, "David", 88.0d, "sales")
+ )).toDF("id", "name", "score", "department")
+
+ writeHudiTable(HoodieTableType.COPY_ON_WRITE, tableName, tablePath, df2,
+ operation = Some("bulk_insert"),
+ extraOptions = Map(RECONCILE_SCHEMA.key() -> "true"))
+
+ // Read back with full schema — 'department' must be null for records from the first batch
+ val readDf = spark.read.format("hudi").load(tablePath)
+ val actual = readDf.select("id", "name", "score", "department").orderBy("id")
+ val rows = actual.collect()
+
+ assertEquals(4, rows.length)
+
+ // Records 1 and 2 came from the narrow-schema file → 'department' must be null
+ assertEquals(1, rows(0).getAs[Int]("id"))
+ assertTrue(rows(0).isNullAt(rows(0).fieldIndex("department")),
+ "Records from the narrow-schema file must have null 'department'")
+ assertEquals(2, rows(1).getAs[Int]("id"))
+ assertTrue(rows(1).isNullAt(rows(1).fieldIndex("department")),
+ "Records from the narrow-schema file must have null 'department'")
+
+ // Records 3 and 4 came from the wider-schema file → 'department' must be present
+ assertEquals(3, rows(2).getAs[Int]("id"))
+ assertEquals("engineering", rows(2).getAs[String]("department"))
+ assertEquals(4, rows(3).getAs[Int]("id"))
+ assertEquals("sales", rows(3).getAs[String]("department"))
+
+ // Confirm the vectorized scan path was used for at least one of the scanned files.
+ assertLanceVectorizedScan(actual, 1)
+ }
+
+ /**
+ * Spark SQL `SELECT … WHERE` query on a COW Lance table. Verifies that predicate evaluation
+ * works correctly when vectorized reading is active (Spark evaluates predicates on
+ * [[ColumnarBatch]] rows returned from the vectorized scan).
+ */
+ @Test
+ def testCOWTableSparkSqlQuery(): Unit = {
+ val tableName = "test_lance_sql_columnar"
+ val tablePath = s"$basePath/$tableName"
+
+ val df = spark.createDataFrame(Seq(
+ (1, "Alice", 30),
+ (2, "Bob", 25),
+ (3, "Charlie", 35),
+ (4, "David", 28),
+ (5, "Eve", 32)
+ )).toDF("id", "name", "age")
+
+ writeHudiTable(HoodieTableType.COPY_ON_WRITE, tableName, tablePath, df,
+ saveMode = SaveMode.Overwrite, operation = Some("bulk_insert"))
+
+ spark.read.format("hudi").load(tablePath).createOrReplaceTempView(tableName)
+
+ val resultDf = spark.sql(
+ s"SELECT id, name, age FROM $tableName WHERE age > 30 ORDER BY id"
+ )
+ val result = resultDf.collect()
+
+ assertEquals(2, result.length, "Only Charlie (35) and Eve (32) satisfy age > 30")
+ assertEquals(3, result(0).getAs[Int]("id"))
+ assertEquals("Charlie", result(0).getAs[String]("name"))
+ assertEquals(5, result(1).getAs[Int]("id"))
+ assertEquals("Eve", result(1).getAs[String]("name"))
+
+ // `numOutputRows` on the scan reflects pre-filter rows; all 5 rows flow through the columnar scan.
+ assertLanceVectorizedScan(resultDf, 1)
+ }
+}