feat(lance): Implement columnar batch reading for Lance (COW only)#18403
feat(lance): Implement columnar batch reading for Lance (COW only)#18403wombatu-kun wants to merge 13 commits into
Conversation
341ca2e to
859b4c9
Compare
|
Will try to take a pass today. |
| nextBatchLoaded = true; | ||
| return true; | ||
| } | ||
| } catch (IOException e) { |
There was a problem hiding this comment.
In the event you hit this IOExeception I believe you need to set the finished=true right?
There was a problem hiding this comment.
No — the throw on line 108 unwinds the stack, so finished is never read after this point. Setting it before the throw would be dead code. If we wanted to swallow the exception and return false instead, then yes, finished = true would be required — but silently hiding an I/O error would be worse.
| * Used when enableVectorizedReader=true and no type casting is needed. | ||
| */ | ||
| private def readBatch(file: PartitionedFile, | ||
| allocator: org.apache.arrow.memory.BufferAllocator, |
There was a problem hiding this comment.
can you import these instead of using the full qualified path.
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Helpers |
There was a problem hiding this comment.
I think we can remove this comment.
| // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) | ||
| nullColumnVectors.foreach { case (_, columnVector, arrowVector) => | ||
| columnVector.close() | ||
| arrowVector.close() |
There was a problem hiding this comment.
is this arrowVector close needed? Does the above line handle this implicitly?
9c06b1c to
28c10c9
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review
Style & readability pass — a few naming and idiom suggestions below, mostly around the new readBatch helper in SparkLanceReaderBase and some defensive null-checks on final fields in LanceBatchIterator.
| } | ||
|
|
||
| @Override | ||
| public ColumnarBatch next() { |
There was a problem hiding this comment.
🤖 nit: arrowReader, lanceReader, and allocator are all final fields set in the constructor, so they can never be null unless the caller explicitly passes null. The != null guards here add noise — could you either drop them and trust the non-null contract, or enforce it upfront with Objects.requireNonNull in the constructor?
There was a problem hiding this comment.
Done — added Objects.requireNonNull in the constructor for all three fields; removed the null guards in close().
| Array.empty | ||
| } | ||
|
|
||
| // Pre-create partition column vectors (reused across batches, reset per batch) |
There was a problem hiding this comment.
🤖 nit: assigning null to a val in Scala (else null) and then checking if (nullAllocator != null) downstream reads as Java-style. Option[BufferAllocator] would express "possibly absent allocator" more naturally and let you use .foreach(_.close()) instead of the null guards.
There was a problem hiding this comment.
Done — switched to Option[BufferAllocator] with .foreach(_.close()).
| val partitionVectors: Array[WritableColumnVector] = if (hasPartitionColumns) { | ||
| partitionSchema.fields.map(f => new OnHeapColumnVector(DEFAULT_BATCH_SIZE, f.dataType)) | ||
| } else { | ||
| Array.empty |
There was a problem hiding this comment.
🤖 nit: the three-element tuple (Int, LanceArrowColumnVector, org.apache.arrow.vector.FieldVector) accessed via _._1, _._2, _._3 is a bit opaque. A small case class like NullColumnEntry(colIndex: Int, columnVector: LanceArrowColumnVector, arrowVector: FieldVector) — or even keying by column index in a Map — would make the access sites read more clearly.
There was a problem hiding this comment.
Done — introduced private case class NullColumnEntry(colIndex, columnVector, arrowVector) and updated all access sites.
| if (numRows != entry._3.getValueCount) { | ||
| entry._3.setValueCount(numRows) | ||
| } | ||
| vectors(i) = entry._2 |
There was a problem hiding this comment.
🤖 nit: nullColumnVectors.find(_._1 == i).get will throw a bare NoSuchElementException if the invariant is ever violated. Could you swap .get for .getOrElse(throw new IllegalStateException(s"No null vector pre-created for column index $i")) to give a meaningful diagnostic?
There was a problem hiding this comment.
Done — .getOrElse(throw new IllegalStateException(s"No null vector pre-created for column index $i")) with column index in the message.
| // Lance uses LanceArrowColumnVector for data columns and OnHeapColumnVector for partition columns. | ||
| // Spark uses vectorTypes to determine if columnar batch reading is supported. | ||
| val lanceVectorType = "org.apache.spark.sql.vectorized.LanceArrowColumnVector" | ||
| val partitionVectorType = classOf[OnHeapColumnVector].getName |
There was a problem hiding this comment.
🤖 nit: val lanceVectorType = "org.apache.spark.sql.vectorized.LanceArrowColumnVector" is a hard-coded string — if the class is ever moved it silently breaks. Could you use classOf[LanceArrowColumnVector].getName here, consistent with how partitionVectorType is defined just below?
There was a problem hiding this comment.
Good catch — done, now uses classOf[LanceArrowColumnVector].getName, consistent with partitionVectorType just below.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Good progress on vectorized batch reading for Lance COW tables! The approach is clean and well-documented. A couple of resource cleanup concerns on error paths to address — see inline comments.
| val joinedRow = new JoinedRow() | ||
| projectedIter.map(row => unsafeProjection(joinedRow(row, file.partitionValues))) | ||
| readRows(file, allocator, lanceReader, arrowReader, filePath, | ||
| requestSchema, requiredSchema, partitionSchema, implicitTypeChangeInfo) |
There was a problem hiding this comment.
🤖 The catch block only closes allocator, but lanceReader and arrowReader (created at lines 109/113) are leaked on exception. The old code handled this by closing lanceIterator (which owns all three). Could you close arrowReader and lanceReader here as well, or wrap them in a helper that closes all three?
There was a problem hiding this comment.
Fixed — moved lanceReader/arrowReader to var declarations before the try block so the catch block can close all three resources in reverse order, with suppressed exception handling to avoid masking the original error.
| lanceException = e; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
🤖 If allocator.close() throws (e.g., due to a buffer leak detected by Arrow), both arrowException and lanceException are silently swallowed. Consider wrapping this in a try-catch and adding any prior exceptions as suppressed, similar to the pattern used for the other two resources.
There was a problem hiding this comment.
Fixed — allocator.close() is now wrapped in try-catch. If it throws, prior exceptions from arrowReader/lanceReader are attached as suppressed. The propagation order is: allocator exception > arrow exception > lance exception, with earlier ones added as suppressed to the one that's thrown.
| arrowVector.allocateNew() | ||
| arrowVector.setValueCount(DEFAULT_BATCH_SIZE) | ||
| (idx, new LanceArrowColumnVector(arrowVector), arrowVector) | ||
| } |
There was a problem hiding this comment.
🤖 The null Arrow vectors are allocated once with DEFAULT_BATCH_SIZE and later resized via setValueCount(numRows). If the last batch has fewer rows this is fine, but have you verified that setValueCount on an all-null vector with a count larger than the initially allocated capacity doesn't cause an OOBE? (Arrow vectors typically auto-reallocate, but worth double-checking for the validity buffer.)
There was a problem hiding this comment.
Verified — Arrow vectors auto-reallocate on setValueCount (see BaseFixedWidthVector.setValueCount → reallocBufferHelper). For all-null vectors only the validity buffer is involved, and it's expanded automatically. This is the same pattern Spark's own Parquet reader uses. Added a comment in code clarifying this.
| val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf) | ||
| if (mandatoryFields.isEmpty) { | ||
| originalVectorTypes | ||
| if (hoodieFileFormat == HoodieFileFormat.LANCE && !isMultipleBaseFileFormatsEnabled) { |
There was a problem hiding this comment.
🤖 The Lance branch in vectorTypes doesn't apply the mandatoryFields adjustment that the Parquet/ORC path does (lines 201-212). mandatoryFields can be non-empty for Lance tables (e.g., partition columns to read, incremental fields). Could you verify that this doesn't cause a vector-type mismatch when mandatoryFields is non-empty and hoodieFileFormat == LANCE?
There was a problem hiding this comment.
Applied the mandatoryFields adjustment for the Lance branch — same pattern as Parquet/ORC, using LanceArrowColumnVector for mandatory partition columns that are read from the file. For the current COW-only scope mandatoryFields is typically empty, but this ensures correctness if the scope expands.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice updates — thanks for addressing all the feedback thoroughly. Both resource-leak bugs are properly fixed: the error path in SparkLanceReaderBase now closes all three resources in reverse order with suppressed exceptions, and LanceBatchIterator.close() wraps allocator.close() with proper exception chaining. The mandatoryFields handling for Lance and the NullColumnEntry case class are clean improvements. All prior findings (mine and other reviewers') have been resolved.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: The PR introduces clustering expiration with heartbeat-based rollback, refactors wrapper filesystem metrics initialization into a utility, enhances the distributed registry system for table-scoped metric tracking, adds log-compaction-aware delta commit selection, substantially overhauls Flink RLI bootstrap with global failover semantics, implements Lance vectorized batch iteration, enforces schema validation against nested vector types, adds column pruning to incremental reads, refactors CDC processing into shared utility classes, and normalizes instant-time parsing across SQL/datasource APIs.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Config as Config<br/>(Expiration<br/>Enabled)
participant Client as Write<br/>Client
participant Heartbeat as Heartbeat<br/>Client
participant Timeline as Active<br/>Timeline
participant Rollback as Rollback<br/>Handler
Client->>Config: Check isExpirationOf<br/>ClusteringEnabled()
Config-->>Client: true
Client->>Heartbeat: start(instantTime)
Heartbeat->>Heartbeat: Track instant
Client->>Timeline: Schedule clustering
Timeline->>Timeline: Add pending instant
Note over Heartbeat,Timeline: Time passes...
Client->>Heartbeat: Check isHeartbeat<br/>Expired(time)
Heartbeat-->>Client: true
Client->>Config: Get<br/>ExpirationThresholdMins
Config-->>Client: minutes
Client->>Rollback: isClusteringInstant<br/>EligibleForRollback()
Rollback->>Heartbeat: Verify expiration
Heartbeat-->>Rollback: Confirmed expired
Rollback-->>Client: true
Client->>Timeline: Rollback pending<br/>instant
Timeline->>Timeline: Remove instant
Heartbeat->>Heartbeat: Cleanup tracking
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Job as Job
participant Coordinator as Stream<br/>Write<br/>Coordinator
participant Bootstrap as RLI<br/>Bootstrap
participant EventBuf as Event<br/>Buffers
participant Timeline as Timeline
Job->>Coordinator: Initialize with<br/>RLI enabled
Coordinator->>Bootstrap: Create operator
Bootstrap->>Timeline: Load RLI records
Job->>Coordinator: Checkpoint 1
Coordinator->>EventBuf: Stage events
Job->>Coordinator: Job restart
Coordinator->>EventBuf: Restore checkpoint
EventBuf-->>Coordinator: Pending instants
Coordinator->>Coordinator: recommitInstant()
Coordinator->>Coordinator: Check isRLIWith<br/>Bootstrap
alt Recommit succeeded
Coordinator->>Job: failJob() trigger
Job->>Job: Global failover
else Already committed
Coordinator->>EventBuf: reset()
end
Note over Job,Timeline: Next checkpoint
Coordinator->>Timeline: Verify completion
Timeline-->>Coordinator: Instant committed
CodeRabbit: yihua#38 (review)
| // Register cleanup listener | ||
| Option(TaskContext.get()).foreach { ctx => | ||
| ctx.addTaskCompletionListener[Unit](_ => mappedIterator.close()) | ||
| } |
There was a problem hiding this comment.
readBatch() cleanup needs an idempotent guard.
This iterator installs a task-completion close callback here, and the enclosing file format also adds a CloseableIteratorListener on the returned iterator. mappedIterator.close() has no closed flag, so nullAllocator.close() / partitionVectors.close() can run twice and turn a successful scan into a task-end failure.
Suggested fix
val mappedIterator = new Iterator[ColumnarBatch] with Closeable {
+ private[this] var closed = false
+
override def hasNext: Boolean = batchIterator.hasNext()
@@
override def close(): Unit = {
+ if (closed) {
+ return
+ }
+ closed = true
// Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator)
nullColumnVectors.foreach(_.columnVector.close())
nullAllocator.foreach(_.close())
batchIterator.close()
partitionVectors.foreach(_.close())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala`
around lines 250 - 262, Add an idempotent guard to the close path installed by
readBatch(): modify the close() implementation of the mapped iterator returned
by readBatch() (the method that currently calls
nullColumnVectors.foreach(_.columnVector.close()),
nullAllocator.foreach(_.close()), batchIterator.close(),
partitionVectors.foreach(_.close())) to be no-op on subsequent calls by using a
private AtomicBoolean/volatile boolean closed flag checked-and-set (or
synchronized check) at the top of close(); ensure the task-completion listener
that calls mappedIterator.close() uses the same guarded close so
nullAllocator.close() and partitionVectors.foreach(_.close()) cannot run twice.
— CodeRabbit (original) (source:comment#3082899529)
There was a problem hiding this comment.
Fixed in ff828d0 — added a private closed flag on the anonymous Iterator[ColumnarBatch] with Closeable, so the task-completion listener and the outer CloseableIteratorListener can both call close() without double-closing nullAllocator / partitionVectors.
| } | ||
| } | ||
|
|
||
| (batchCount, rowCount, rows.toSeq) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and examine the test file
find . -name "TestLanceColumnarBatch.scala" -type fRepository: yihua/hudi
Length of output: 159
🏁 Script executed:
# Also search for SparkLanceReaderBase to understand the return type
find . -name "*SparkLanceReader*" -type fRepository: yihua/hudi
Length of output: 395
🏁 Script executed:
# Search for LanceBatchIterator definition
find . -name "*.scala" -type f | xargs grep -l "class LanceBatchIterator"Repository: yihua/hudi
Length of output: 38
🏁 Script executed:
cat -n ./hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scalaRepository: yihua/hudi
Length of output: 20757
🏁 Script executed:
# Get the test file and check the specific lines
wc -l ./hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scalaRepository: yihua/hudi
Length of output: 163
🏁 Script executed:
# Read the test file
cat -n ./hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scalaRepository: yihua/hudi
Length of output: 25409
🏁 Script executed:
# Check if LanceBatchIterator and LanceRecordIterator are Closeable
find . -name "*.scala" -o -name "*.java" | xargs grep -l "class LanceBatchIterator\|class LanceRecordIterator"Repository: yihua/hudi
Length of output: 250
🏁 Script executed:
# Search for TaskContext usage in tests
grep -n "TaskContext" ./hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scalaRepository: yihua/hudi
Length of output: 175
🏁 Script executed:
# Check the actual iterator type returned and if it implements Closeable
cat -n ./hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java | head -100Repository: yihua/hudi
Length of output: 5095
🏁 Script executed:
cat -n ./hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.javaRepository: yihua/hudi
Length of output: 7243
🏁 Script executed:
cat -n ./hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.javaRepository: yihua/hudi
Length of output: 7709
🏁 Script executed:
# Verify TaskContext.get() behavior in unit test context
grep -n "Option(TaskContext.get())" ./hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scalaRepository: yihua/hudi
Length of output: 155
Close the returned iterator from SparkLanceReaderBase.read().
The vectorized path allocates Arrow allocators and file readers, delegating cleanup to TaskContext completion listeners. In unit tests running on the driver (outside of task execution), TaskContext.get() returns null and these listeners are never registered. Explicitly closing the iterator ensures Arrow and Lance state is released deterministically.
Suggested fix
- val iter = reader.read(pf, requiredSchema, partitionSchema, HOption.empty(), Seq.empty[Filter], storageConf)
- .asInstanceOf[Iterator[Any]]
-
- while (iter.hasNext) {
- iter.next() match {
- case batch: ColumnarBatch =>
- batchCount += 1
- batch.rowIterator().asScala.foreach { row =>
- rows += extractRow(row, requiredSchema)
- }
-
- case row: InternalRow =>
- rowCount += 1
- rows += extractRow(row, requiredSchema)
- }
- }
+ val iter = reader.read(pf, requiredSchema, partitionSchema, HOption.empty(), Seq.empty[Filter], storageConf)
+ .asInstanceOf[Iterator[Any]]
+
+ try {
+ while (iter.hasNext) {
+ iter.next() match {
+ case batch: ColumnarBatch =>
+ batchCount += 1
+ batch.rowIterator().asScala.foreach { row =>
+ rows += extractRow(row, requiredSchema)
+ }
+
+ case row: InternalRow =>
+ rowCount += 1
+ rows += extractRow(row, requiredSchema)
+ }
+ }
+ } finally {
+ iter match {
+ case c: AutoCloseable => c.close()
+ case _ =>
+ }
+ }Apply the same try/finally pattern to the two inline reader.read(...) loops as well.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| (batchCount, rowCount, rows.toSeq) | |
| val iter = reader.read(pf, requiredSchema, partitionSchema, HOption.empty(), Seq.empty[Filter], storageConf) | |
| .asInstanceOf[Iterator[Any]] | |
| try { | |
| while (iter.hasNext) { | |
| iter.next() match { | |
| case batch: ColumnarBatch => | |
| batchCount += 1 | |
| batch.rowIterator().asScala.foreach { row => | |
| rows += extractRow(row, requiredSchema) | |
| } | |
| case row: InternalRow => | |
| rowCount += 1 | |
| rows += extractRow(row, requiredSchema) | |
| } | |
| } | |
| } finally { | |
| iter match { | |
| case c: AutoCloseable => c.close() | |
| case _ => | |
| } | |
| } | |
| (batchCount, rowCount, rows.toSeq) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala`
around lines 157 - 174, The iterator returned by reader.read(...) (the one cast
to Iterator[Any]) must be closed to release Arrow allocators and file readers;
wrap the creation and consumption of that iterator in a try/finally and call
iter.close() in the finally block (and still handle ColumnarBatch in the match
as before), i.e. replace the direct while(iter.hasNext)... loop with a try {
while(iter.hasNext) { ... } } finally { if (iter != null) iter.close() } around
the reader.read(...) result.
— CodeRabbit (original) (source:comment#3082899544)
| assertEquals(eid, row.getAs[Int]("id")) | ||
| assertEquals(ename, row.getAs[String]("name")) | ||
| assertEquals(escore, row.getAs[Double]("score"), 1e-6) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && find . -name "TestLanceColumnarBatch.scala" -type fRepository: yihua/hudi
Length of output: 126
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && wc -l src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scalaRepository: yihua/hudi
Length of output: 128
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && sed -n '390,520p' src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scalaRepository: yihua/hudi
Length of output: 4899
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && rg -A 5 -B 5 "vectorized|Vectorized|physicalPlan|explain" src/test/scala/org/apache/hudi/functional/ | head -100Repository: yihua/hudi
Length of output: 11742
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && rg "collectFirst.*Lance|LanceVectorReader|collectFirst.*Vectorized" src/test/scala --type scala -B 3 -A 3Repository: yihua/hudi
Length of output: 36
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && rg "explain|physicalPlan|collectFirst" src/test/scala --type scala -B 2 -A 2 | head -80Repository: yihua/hudi
Length of output: 2843
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && rg -l "explain\(|collectFirst" src/test/scala --type scalaRepository: yihua/hudi
Length of output: 143
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && rg -i "vectorized.*scan|lance.*vectorized" src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala -B 2 -A 2Repository: yihua/hudi
Length of output: 1527
🏁 Script executed:
cd hudi-spark-datasource/hudi-spark && sed -n '430,520p' src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala | tail -60Repository: yihua/hudi
Length of output: 2550
Add physical-plan or scan-level assertions to verify vectorized execution in integration tests.
The three DataFrame/SQL integration tests (testCOWTableDataFrameRead, testCOWTableSchemaEvolutionNullPadding, testCOWTableSparkSqlQuery) only validate output rows and are missing assertions that the vectorized execution path was actually used. A silent fallback to the row reader would pass these tests without detection. The unit-level tests already demonstrate the proper pattern by checking batchCount > 0 when calling SparkLanceReaderBase directly; apply the same principle at the integration level by inspecting the physical plan (e.g., via df.explain() or queryExecution.executedPlan.collectFirst) to verify that a vectorized scan operator was invoked.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala`
around lines 395 - 425, Add an assertion in each integration test (e.g.,
testCOWTableDataFrameRead, testCOWTableSchemaEvolutionNullPadding,
testCOWTableSparkSqlQuery) that inspects the executed physical plan to ensure
the vectorized scan was used: use
actual.queryExecution.executedPlan.collectFirst to find a plan node whose class
simple name indicates a Lance columnar/batch scan (or pattern-match to
SparkLanceReaderBase) and assert the node exists, and if you can access the
reader instance (SparkLanceReaderBase) assert its batchCount > 0 to prove
columnar batches were produced.
— CodeRabbit (original) (source:comment#3082899551)
There was a problem hiding this comment.
Fixed in ff828d0 — added assertLanceVectorizedScan(df, expectedMinRows) that walks the executed plan (descending into AdaptiveSparkPlanExec.executedPlan and QueryStageExec.plan so the scan node isn't hidden under AQE), requires supportsColumnar == true on a FileSourceScan/BatchScan, and asserts the scan's numOutputRows SQLMetric is populated. Wired into all three integration tests (testCOWTableDataFrameRead, testCOWTableSchemaEvolutionNullPadding, testCOWTableSparkSqlQuery), so a silent fallback to the row reader would now fail the assertion instead of passing quietly.
- Idempotent close() in SparkLanceReaderBase.readBatch iterator: guard nullAllocator and partitionVectors against double-free when both the TaskContext listener and the outer CloseableIteratorListener invoke close(). - Close iterator in TestLanceColumnarBatch.runReadAndCollect: wrap the consume loop in try/finally and close via AutoCloseable; needed on the driver where TaskContext.get() is null and the completion-listener path never runs. - Assert vectorized scan path in 3 integration tests via a new helper that inspects SQLMetrics on a columnar FileSourceScan/BatchScan, descending into AdaptiveSparkPlanExec and QueryStageExec to reach the scan under AQE. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice updates — thanks for addressing all the feedback. The idempotent close() with the closed flag cleanly prevents double-free from both the TaskContext listener and CloseableIteratorListener. The test improvements (try/finally cleanup and assertLanceVectorizedScan) are solid additions that guard against silent fallback to the row reader. All prior findings have been resolved.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: The PR introduces clustering expiration with heartbeat-based rollback, refactors wrapper filesystem metrics initialization into a utility, enhances the distributed registry system for table-scoped metric tracking, adds log-compaction-aware delta commit selection, substantially overhauls Flink RLI bootstrap with global failover semantics, implements Lance vectorized batch iteration, enforces schema validation against nested vector types, adds column pruning to incremental reads, refactors CDC processing into shared utility classes, and normalizes instant-time parsing across SQL/datasource APIs.
Greptile Summary: This PR implements columnar batch (vectorized) reading for Lance COW tables in Spark by introducing LanceBatchIterator and a dual code-path in SparkLanceReaderBase that selects between row-based and columnar output. It also bundles several orthogonal improvements: clustering expiration/heartbeat support, Flink source-reader record limiting, incremental-relation column pruning (TableScan → PrunedScan), CDC image-management refactoring into CdcImageManager, ParquetReaderIterator idempotent-close hardening, and a distributed metrics registry refactor.
Key changes:
lanceBatchSupportedflipped fromfalse→true;vectorTypes()updated to declareLanceArrowColumnVectorfor data columns andOnHeapColumnVectorfor partition columnsSparkLanceReaderBase.readBatch()builds a mappedIterator[ColumnarBatch]with schema-evolution null-padding and constant partition vectors; falls back to row path when implicit type casts are neededLanceBatchIteratorwraps anArrowReader, reusing the sameColumnVectorinstances across batches for zero-copy throughputHoodieEngineContext.getMetricRegistry/HoodieSparkEngineContext.getMetricRegistry+Registry.setRegistriespropagateDistributedRegistryaccumulators into Spark task lambdasIncrementalRelationV1/V2now implementPrunedScan.buildScan(requiredColumns)and useIncrementalRelationUtilto derive pruned schemas, fixing duplicate skeleton/data schema field merging
Issues found:
Registry.setRegistriesstores registries under key\"::tableName.registryName\"butgetRegistryOfClass(tableName, registryName, …)looks up\"tableName::registryName\"— executor-side distributed-registry lookup always misses, so executor metrics fall back to a newLocalRegistryand are never accumulatedRecordLimiter.wrap()does not guardnextRecordFromSplit()againstlimit == NO_LIMIT (-1L);0L >= -1Listrue, so all records would be suppressed ifwrap()were called with aNO_LIMITlimiterpopulatePartitionVectorshandlesTimestampTypebut notTimestampNTZType(Spark 3.4+) — partition columns of that type fall through toputNulls, producing all-null values in vectorized Lance readsHoodieClusteringJob.rollbackFailedClusteringForPartitionsthrows on the first non-eligible instant rather than skipping it, potentially blocking all writes to the affected partitions if any live clustering job holds an instantneedLogCompactuses0as the default delta-commit count when no prior log compaction exists; combined withMath.min, this always evaluates to0 < threshold, so log compaction would never fire in that edge case
Greptile Confidence Score: 3/5
The core Lance columnar feature is well-implemented with a thorough test suite, but a key-format mismatch in the distributed metrics subsystem and a missing TimestampNTZType case in the vectorized partition-vector path should be fixed before merging.
The Lance batch-reading feature (LanceBatchIterator, SparkLanceReaderBase, vectorTypes) is sound and well-tested. However two issues warrant fixes: (1) Registry.setRegistries key-format mismatch silently breaks distributed metric accumulation on executors; (2) missing TimestampNTZType in populatePartitionVectors produces wrong null values for Spark 3.4+ tables. RecordLimiter.wrap() NO_LIMIT is a latent hazard. Clustering and compaction changes are mostly clean with one design concern.
hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java (key format mismatch in setRegistries), hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala (TimestampNTZType + null-vector lookup performance), hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java (NO_LIMIT guard)
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Config as Config<br/>(Expiration<br/>Enabled)
participant Client as Write<br/>Client
participant Heartbeat as Heartbeat<br/>Client
participant Timeline as Active<br/>Timeline
participant Rollback as Rollback<br/>Handler
Client->>Config: Check isExpirationOf<br/>ClusteringEnabled()
Config-->>Client: true
Client->>Heartbeat: start(instantTime)
Heartbeat->>Heartbeat: Track instant
Client->>Timeline: Schedule clustering
Timeline->>Timeline: Add pending instant
Note over Heartbeat,Timeline: Time passes...
Client->>Heartbeat: Check isHeartbeat<br/>Expired(time)
Heartbeat-->>Client: true
Client->>Config: Get<br/>ExpirationThresholdMins
Config-->>Client: minutes
Client->>Rollback: isClusteringInstant<br/>EligibleForRollback()
Rollback->>Heartbeat: Verify expiration
Heartbeat-->>Rollback: Confirmed expired
Rollback-->>Client: true
Client->>Timeline: Rollback pending<br/>instant
Timeline->>Timeline: Remove instant
Heartbeat->>Heartbeat: Cleanup tracking
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Job as Job
participant Coordinator as Stream<br/>Write<br/>Coordinator
participant Bootstrap as RLI<br/>Bootstrap
participant EventBuf as Event<br/>Buffers
participant Timeline as Timeline
Job->>Coordinator: Initialize with<br/>RLI enabled
Coordinator->>Bootstrap: Create operator
Bootstrap->>Timeline: Load RLI records
Job->>Coordinator: Checkpoint 1
Coordinator->>EventBuf: Stage events
Job->>Coordinator: Job restart
Coordinator->>EventBuf: Restore checkpoint
EventBuf-->>Coordinator: Pending instants
Coordinator->>Coordinator: recommitInstant()
Coordinator->>Coordinator: Check isRLIWith<br/>Bootstrap
alt Recommit succeeded
Coordinator->>Job: failJob() trigger
Job->>Job: Global failover
else Already committed
Coordinator->>EventBuf: reset()
end
Note over Job,Timeline: Next checkpoint
Coordinator->>Timeline: Verify completion
Timeline-->>Coordinator: Instant committed
Sequence Diagram (Greptile):
sequenceDiagram
participant Spark as Spark FileSourceScan
participant FGFF as HoodieFileGroupReaderBasedFileFormat
participant SLRB as SparkLanceReaderBase
participant LBI as LanceBatchIterator
participant AR as ArrowReader (Lance)
participant VSR as VectorSchemaRoot
Spark->>FGFF: supportBatch() returns true
Spark->>FGFF: vectorTypes() returns LanceArrowColumnVector + OnHeapColumnVector
Spark->>FGFF: buildReaderWithPartitionValues(file)
FGFF->>SLRB: read(file, requiredSchema, partitionSchema)
alt enableVectorizedReader=true and no type changes
SLRB->>AR: lanceReader.readAll(columns, null, batchSize)
SLRB->>LBI: new LanceBatchIterator(allocator, lanceReader, arrowReader)
loop each batch
Spark->>SLRB: mappedIterator.hasNext()
SLRB->>LBI: hasNext()
LBI->>AR: loadNextBatch()
AR->>VSR: fill FieldVectors with new data in-place
LBI-->>SLRB: true
Spark->>SLRB: mappedIterator.next()
SLRB->>LBI: next() returns ColumnarBatch
SLRB->>SLRB: remap columns + null-pad missing + append partition vectors
SLRB-->>Spark: ColumnarBatch zero-copy
end
else row path when type cast needed
SLRB->>SLRB: readRows()
SLRB-->>Spark: Iterator of InternalRow via UnsafeProjection
end
note over SLRB,LBI: TaskCompletionListener calls close() which closes arrowReader then lanceReader then allocator
CodeRabbit: yihua#38 (review)
Greptile: yihua#38 (review)
| Some(SQLConf.get.sessionLocalTimeZone), | ||
| implicitTypeChangeInfo, | ||
| requiredSchema, | ||
| new StructType(), |
There was a problem hiding this comment.
TimestampNTZType not handled — partition vector filled with nulls
populatePartitionVectors contains an exhaustive match on DataType, but the Spark 3.4+ TimestampNTZType (stored internally as a Long, just like TimestampType) falls through to the wildcard case _ => that calls vector.putNulls(0, numRows). Any table partitioned on a TimestampNTZType column will produce all-null values for that partition column in vectorized Lance reads.
Add the missing case:
| new StructType(), | |
| case LongType | TimestampType | TimestampNTZType => | |
| val v = partitionValues.getLong(i) | |
| vector.putLongs(0, numRows, v) |
— Greptile (original) (source:comment#3082904840)
There was a problem hiding this comment.
Good catch — fixed in 6568344. Extended the LongType | TimestampType arm in populatePartitionVectors to also cover TimestampNTZType. Spark stores TIMESTAMP_NTZ internally as a Long (microseconds since epoch), so the existing putLongs path applies as-is.
| // Map each source batch to a batch with the correct column layout. | ||
| val mappedIterator = new Iterator[ColumnarBatch] with Closeable { | ||
| private[this] var closed = false | ||
|
|
There was a problem hiding this comment.
O(n) linear scan for null-column vectors per batch
nullColumnVectors.find(_.colIndex == i) scans a Scala Array on every call to next() for every missing column. For a table with m missing columns and k batches, this is O(m × k) passes over the array.
Pre-compute a direct-indexed lookup array when nullColumnVectors is built, then replace .find(_.colIndex == i) with a direct array access.
— Greptile (original) (source:comment#3082905008)
There was a problem hiding this comment.
Fixed in 6568344. Added a direct-indexed nullColumnByIndex: Array[NullColumnEntry] of size requiredSchema.length, populated once alongside nullColumnVectors. The hot loop in mappedIterator.next() now does nullColumnByIndex(i) (O(1)) instead of nullColumnVectors.find(_.colIndex == i). The invariant check is preserved as IllegalStateException with the same message.
- Handle TimestampNTZType in SparkLanceReaderBase.populatePartitionVectors: TIMESTAMP_NTZ is internally a Long (microseconds since epoch) just like TimestampType, but was falling through to the wildcard case and producing all-null partition values for any TIMESTAMP_NTZ-partitioned table on the vectorized path. - Replace per-batch O(n) linear scan `nullColumnVectors.find(_.colIndex == i)` with a direct-indexed `nullColumnByIndex: Array[NullColumnEntry]` lookup, removing m*k array scans for tables with m missing columns and k batches. - Add TestLanceColumnarBatch#testColumnarPathPartitionVectorTimestampNtz regression test that fails before the populatePartitionVectors fix because every row would carry a null partition value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: The PR adds heartbeat-based clustering instant expiration and rollback eligibility checks, refactors wrapper filesystem metrics setup into a shared utility and enhances distributed/table-scoped metric registries, implements log-compaction-aware delta selection, introduces Lance vectorized batch iteration, enforces VECTOR-schema nesting restrictions, adds incremental-read column pruning and instant-time normalization, refactors Flink CDC/RLI bootstrap and split/read plumbing, and adds many tests across these areas.
Greptile Summary: This PR implements vectorized columnar batch reading for Lance files (COW only), upgrades incremental relations to PrunedScan for column pruning, adds clustering-expiration-based rollback support, and introduces table-scoped distributed metric registries. It is a large, multi-feature PR that touches 79 files.
Key changes:
LanceBatchIterator— new columnar iterator that returnsColumnarBatchdirectly from Lance Arrow batches;LanceRecordIteratoris corrected to avoid double-freeing shared ArrowFieldVectorreferences.SparkLanceReaderBase— splits intoreadBatch(vectorized path) andreadRows(row path); handles null-padded schema-evolution columns and constant partition vectors per batch.IncrementalRelationV1/IncrementalRelationV2— upgraded fromTableScantoPrunedScan;IncrementalRelationUtiltrims the scanned schema to requested columns plus mandatory Hudi fields.HoodieSchema— new validation prevents VECTOR fields in nested STRUCT/ARRAY/MAP types; however a gap exists whereARRAY<STRUCT<VECTOR>>bypasses the guard (see inline comment).- Clustering expiration — heartbeat-based detection of stale clustering instants with automatic rollback.
- Distributed metrics — table-scoped
DistributedRegistryaccumulator injected into Spark operations; the key-format mismatch betweensetRegistriesandgetRegistryOfClass(prior thread) remains. ScheduleCompactionActionExecutor—needLogCompactuses0as the no-prior-log-compaction sentinel, makingMath.minalways return0and disabling log-compaction scheduling (prior thread).
Greptile Confidence Score: 3/5
Not yet safe to merge — multiple concrete bugs from prior review rounds remain unresolved alongside a new schema-validation gap found in this round.
The core Lance columnar batch path is well-structured and the LanceRecordIterator double-free fix is correct. Column-pruning for incremental reads is a clean implementation. However, several concrete bugs remain open from prior review threads: (1) TimestampNTZType partition columns produce all-null values in vectorized Lance reads; (2) Registry.setRegistries stores under a key format that getRegistryOfClass can never match, silently disabling distributed metrics; (3) needLogCompact always returns false due to the 0 no-prior-log-compaction sentinel; (4) rollbackFailedClusteringForPartitions throws on the first ineligible instant, blocking writes to entire partitions. This round adds a new P1 schema-validation gap (ARRAY<STRUCT> bypasses the guard).
SparkLanceReaderBase.scala (TimestampNTZType + O(n) null-vector scan), Registry.java + HoodieSparkEngineContext.java (setRegistries key mismatch), ScheduleCompactionActionExecutor.java (needLogCompact sentinel), HoodieClusteringJob.java (throw vs skip), HoodieSchema.java (ARRAY/MAP+STRUCT+VECTOR gap).
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Config as Config<br/>(Expiration)
participant Client as TableService<br/>Client
participant Heartbeat as Heartbeat<br/>Client
participant Timeline as Active<br/>Timeline
participant Rollback as Rollback<br/>Handler
Client->>Config: isExpirationOfClusteringEnabled()
Config-->>Client: true
Client->>Heartbeat: start(instantTime)
Client->>Timeline: scheduleClustering(instantTime)
Note over Heartbeat,Timeline: time passes...
Client->>Heartbeat: isHeartbeatExpired(requestedTime)
Heartbeat-->>Client: true
Client->>Config: getClusteringExpirationThresholdMins()
Config-->>Client: threshold
Client->>Rollback: isClusteringInstantEligibleForRollback(metaClient, instant, config, heartbeat)
Rollback-->>Client: eligible
Client->>Timeline: rollback(instant)
Timeline-->>Client: instantRemoved
Heartbeat->>Heartbeat: stop(instantTime)
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Job as FlinkJob
participant Coord as StreamWrite<br/>Coordinator
participant EventBuf as Event<br/>Buffers
participant Bootstrap as RLI<br/>BootstrapOperator
participant Timeline as Active<br/>Timeline
Job->>Coord: initialize(with RLI bootstrap)
Coord->>Bootstrap: createOperator()
Bootstrap->>Timeline: loadRLIRecords()
Job->>Coord: checkpoint(id=CK1)
Coord->>EventBuf: stageEvents()
Job->>Coord: restart
Coord->>EventBuf: restoreCheckpoint(CK1)
EventBuf-->>Coord: pendingInstants
Coord->>Coord: recommitInstant(instant)
alt instant already completed
Coord->>EventBuf: reset(checkpointId)
else recommit succeeded
Coord->>Job: failJob() -- trigger global failover
end
Note over Coord,Timeline: subsequent checkpoints validate final commit state
Sequence Diagram (Greptile):
sequenceDiagram
participant Spark as Spark FileSourceScanExec
participant SLRB as SparkLanceReaderBase
participant LBI as LanceBatchIterator
participant LFR as LanceFileReader
participant AR as ArrowReader
Spark->>SLRB: read(file, requiredSchema, partitionSchema, ...)
SLRB->>LFR: open(filePath, allocator)
SLRB->>AR: lanceReader.readAll(columnNames, null, batchSize)
alt enableVectorizedReader and no type changes
SLRB->>LBI: new LanceBatchIterator(allocator, lanceReader, arrowReader)
SLRB-->>Spark: Iterator[ColumnarBatch] type-erased as Iterator[InternalRow]
loop Each batch
Spark->>LBI: hasNext()
LBI->>AR: loadNextBatch()
AR-->>LBI: VectorSchemaRoot reused in-place
LBI-->>Spark: true
Spark->>LBI: next()
LBI-->>Spark: ColumnarBatch(columnVectors, numRows)
Note over Spark: Reorder via columnMapping, inject null vectors, append partition vectors
end
else row mode
SLRB->>SLRB: readRows() to LanceRecordIterator
SLRB-->>Spark: Iterator[InternalRow] with padding and cast projections
end
Spark->>LBI: TaskContext.onComplete to close()
LBI->>AR: close()
LBI->>LFR: close()
LBI->>LBI: allocator.close()
CodeRabbit: yihua#38 (review)
Greptile: yihua#38 (review)
| result.setNumRows(numRows) | ||
| result | ||
| } | ||
|
|
There was a problem hiding this comment.
Attempt every close before making the iterator permanently closed.
Line 255 marks the iterator closed before cleanup runs. If Line 257 or Line 258 throws, batchIterator.close() and partitionVectors.close() are skipped, and every later close() becomes a no-op. This can strand Arrow/Lance resources after an otherwise successful read.
♻️ Proposed fix
override def close(): Unit = {
// Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close().
if (!closed) {
closed = true
- // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator)
- nullColumnVectors.foreach(_.columnVector.close())
- nullAllocator.foreach(_.close())
- batchIterator.close()
- partitionVectors.foreach(_.close())
+ var closeError: Exception = null
+
+ def closeSafely(f: => Unit): Unit = {
+ try {
+ f
+ } catch {
+ case e: Exception =>
+ if (closeError == null) {
+ closeError = e
+ } else {
+ closeError.addSuppressed(e)
+ }
+ }
+ }
+
+ // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator)
+ nullColumnVectors.foreach(v => closeSafely(v.columnVector.close()))
+ nullAllocator.foreach(a => closeSafely(a.close()))
+ closeSafely(batchIterator.close())
+ partitionVectors.foreach(v => closeSafely(v.close()))
+
+ if (closeError != null) {
+ throw closeError
+ }
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| override def close(): Unit = { | |
| // Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close(). | |
| if (!closed) { | |
| closed = true | |
| var closeError: Exception = null | |
| def closeSafely(f: => Unit): Unit = { | |
| try { | |
| f | |
| } catch { | |
| case e: Exception => | |
| if (closeError == null) { | |
| closeError = e | |
| } else { | |
| closeError.addSuppressed(e) | |
| } | |
| } | |
| } | |
| // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator) | |
| nullColumnVectors.foreach(v => closeSafely(v.columnVector.close())) | |
| nullAllocator.foreach(a => closeSafely(a.close())) | |
| closeSafely(batchIterator.close()) | |
| partitionVectors.foreach(v => closeSafely(v.close())) | |
| if (closeError != null) { | |
| throw closeError | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala`
around lines 252 - 260, The close() implementation currently sets the closed
flag before performing cleanup so exceptions from nullColumnVectors.foreach,
nullAllocator.foreach, batchIterator.close(), or partitionVectors.foreach can
prevent remaining resources from being closed; change the logic in
SparkLanceReaderBase.close() to attempt all close operations first (wrapping
each close call or each group in try/catch to log/ignore exceptions) and only
set closed = true after all cleanup has been attempted (or use a try {
cleanup... } finally { closed = true }) so that one failing close cannot
short-circuit the rest.
— CodeRabbit (original) (source:comment#3085081715)
There was a problem hiding this comment.
Thanks, valid concern. Addressed in 69e5d50: each cleanup call is now wrapped in a closeSafely helper that captures the first Exception and attaches any subsequent ones as suppressed. The final throw happens only after every resource has been given a chance to close, so a failure in one step no longer strands the others.
I kept closed = true at the top of the block on purpose — that guarantees idempotency for the second-call path (TaskCompletionListener vs. CloseableIteratorListener) even if the first call partially fails. Catching Exception rather than Throwable so VM errors (OutOfMemoryError, StackOverflowError) still propagate immediately.
| while (j < numRows) { vector.putLong(j, unscaled); j += 1 } | ||
| } else { | ||
| val bytes = v.toJavaBigDecimal.unscaledValue().toByteArray | ||
| var j = 0 |
There was a problem hiding this comment.
Handle TimestampNTZType partition columns here.
This matcher covers TimestampType but falls through to the null-filling default for TimestampNTZType, so vectorized scans will return null partition values for timestamp_ntz partitions while the row path stays correct.
🛠️ Proposed fix
- case LongType | TimestampType =>
+ case LongType | TimestampType | TimestampNTZType =>
val v = partitionValues.getLong(i)
vector.putLongs(0, numRows, v)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala`
around lines 368 - 405, The pattern match that fills partition vectors in
SparkLanceReaderBase.scala currently handles TimestampType but not
TimestampNTZType, causing timestamp_ntz partitions to be filled with nulls;
update the match in the method that uses partitionValues, vector and numRows to
include TimestampNTZType (treat it the same as TimestampType) — e.g., add
TimestampNTZType alongside TimestampType or a separate case that reads
partitionValues.getLong(i) and calls vector.putLongs(0, numRows, v) so NTZ
partitions are populated correctly.
— CodeRabbit (original) (source:comment#3085081736)
There was a problem hiding this comment.
TimestampNTZType was already added to this match in commit 02cb17f (review round 4) — the current line reads case LongType | TimestampType | TimestampNTZType => (see line 390). That commit also added TestLanceColumnarBatch#testColumnarPathPartitionVectorTimestampNtz which would return all-null partition values without this fix. Looks like CodeRabbit analyzed a pre-fix snapshot. Grepped the whole Lance codebase — no other TimestampType branch is missing NTZ.
- Idempotent close() in SparkLanceReaderBase.readBatch iterator: guard nullAllocator and partitionVectors against double-free when both the TaskContext listener and the outer CloseableIteratorListener invoke close(). - Close iterator in TestLanceColumnarBatch.runReadAndCollect: wrap the consume loop in try/finally and close via AutoCloseable; needed on the driver where TaskContext.get() is null and the completion-listener path never runs. - Assert vectorized scan path in 3 integration tests via a new helper that inspects SQLMetrics on a columnar FileSourceScan/BatchScan, descending into AdaptiveSparkPlanExec and QueryStageExec to reach the scan under AQE. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Handle TimestampNTZType in SparkLanceReaderBase.populatePartitionVectors: TIMESTAMP_NTZ is internally a Long (microseconds since epoch) just like TimestampType, but was falling through to the wildcard case and producing all-null partition values for any TIMESTAMP_NTZ-partitioned table on the vectorized path. - Replace per-batch O(n) linear scan `nullColumnVectors.find(_.colIndex == i)` with a direct-indexed `nullColumnByIndex: Array[NullColumnEntry]` lookup, removing m*k array scans for tables with m missing columns and k batches. - Add TestLanceColumnarBatch#testColumnarPathPartitionVectorTimestampNtz regression test that fails before the populatePartitionVectors fix because every row would carry a null partition value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
a2bfd8e to
69e5d50
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for iterating on this! This PR adds a columnar batch read path for Lance COW tables with appropriate fallbacks to the row path for MOR, type-change, and multi-format cases. The resource management, schema evolution handling, and vectorTypes alignment all look well thought through after the prior review rounds. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
Style & Readability — one minor consistency nit in populatePartitionVectors; the rest of the code is clean and well-documented.
cc @yihua
| case LongType | TimestampType | TimestampNTZType => | ||
| val v = partitionValues.getLong(i) | ||
| vector.putLongs(0, numRows, v) | ||
| case FloatType => |
There was a problem hiding this comment.
🤖 nit: FloatType and DoubleType use per-element while loops here, but WritableColumnVector has the same bulk-fill overload as Int/Long — putFloats(0, numRows, v) / putDoubles(0, numRows, v). Could you swap these two cases to use the bulk methods, matching the style a few lines above?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
There was a problem hiding this comment.
Done in afea635 — swapped both arms to the bulk-fill overloads (putFloats(0, numRows, v) / putDoubles(0, numRows, v)), matching the IntegerType/LongType arms a few lines above. No behavior change.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback! The FloatType/DoubleType arms in populatePartitionVectors now use the bulk-fill overloads (putFloats(0, numRows, v) / putDoubles(0, numRows, v)), consistent with the IntegerType/LongType cases just above — this resolves the prior nit. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This pull request introduces Lance columnar batch support for Hudi's Spark integration. It adds a new LanceBatchIterator class for vectorized reading, adjusts batch lifecycle management in LanceRecordIterator, enables dual execution paths in SparkLanceReaderBase (columnar vs. row-based), configures vectorized reading in HoodieFileGroupReaderBasedFileFormat, and includes comprehensive functional tests.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Client
participant SparkLanceReaderBase
participant LanceBatchIterator
participant ArrowReader
participant LanceFileReader
participant ColumnarBatch as ColumnarBatch Factory
Client->>SparkLanceReaderBase: read(file, schema, config)
rect rgba(100, 150, 200, 0.5)
Note over SparkLanceReaderBase: Check if columnar path available
alt enableVectorizedReader && no type changes
SparkLanceReaderBase->>LanceBatchIterator: new(allocator, lanceReader, arrowReader)
SparkLanceReaderBase->>LanceBatchIterator: hasNext()
LanceBatchIterator->>ArrowReader: loadNextBatch()
ArrowReader->>LanceBatchIterator: VectorSchemaRoot
LanceBatchIterator->>LanceArrowColumnVector: wrap field vectors
LanceBatchIterator->>ColumnarBatch: construct from vectors
LanceBatchIterator-->>SparkLanceReaderBase: ColumnarBatch
else fallback to row path
SparkLanceReaderBase->>LanceRecordIterator: iterate rows
LanceRecordIterator-->>SparkLanceReaderBase: InternalRow
end
end
SparkLanceReaderBase->>Client: Iterator<ColumnarBatch|InternalRow>
Client->>SparkLanceReaderBase: close()
SparkLanceReaderBase->>LanceBatchIterator: close()
LanceBatchIterator->>ArrowReader: close()
LanceBatchIterator->>LanceFileReader: close()
CodeRabbit: hudi-agent#5 (review)
| // Append partition values to each row using JoinedRow, then convert to UnsafeRow | ||
| val joinedRow = new JoinedRow() | ||
| projectedIter.map(row => unsafeProjection(joinedRow(row, file.partitionValues))) | ||
| } |
There was a problem hiding this comment.
Preserve close() through the row-path projections.
lanceIterator is closable, but asScala.map(...) returns a plain Scala Iterator, so this path loses the only explicit cleanup handle when TaskContext.get() is null. The new tests already show that: TestLanceColumnarBatch.readAndCollect() can close the columnar path, but the row path falls through as a non-closeable iterator on Lines 177-183. That leaks the Arrow allocator and file readers for any direct/driver-side caller of SparkLanceReaderBase.read().
♻️ Proposed fix
- val projectedIter = lanceIterator.asScala.map(projection.apply)
+ val projectedIter = new Iterator[UnsafeRow] with Closeable {
+ private val rows = lanceIterator.asScala
+
+ override def hasNext: Boolean = rows.hasNext
+
+ override def next(): UnsafeRow = projection(rows.next())
+
+ override def close(): Unit = lanceIterator.close()
+ }
// Handle partition columns
if (partitionSchema.length == 0) {
// No partition columns - return rows directly
- projectedIter
+ projectedIter.asInstanceOf[Iterator[InternalRow]]
} else {
// Create UnsafeProjection to convert JoinedRow to UnsafeRow
val fullSchema = (requiredSchema.fields ++ partitionSchema.fields).map(f =>
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
// Append partition values to each row using JoinedRow, then convert to UnsafeRow
val joinedRow = new JoinedRow()
- projectedIter.map(row => unsafeProjection(joinedRow(row, file.partitionValues)))
+ new Iterator[InternalRow] with Closeable {
+ override def hasNext: Boolean = projectedIter.hasNext
+
+ override def next(): InternalRow =
+ unsafeProjection(joinedRow(projectedIter.next(), file.partitionValues))
+
+ override def close(): Unit = projectedIter.close()
+ }
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala`
around lines 341 - 356, The row-path currently returns a plain Scala Iterator
(projectedIter) created from lanceIterator.asScala, which loses
lanceIterator.close() and leaks resources; replace the returned projectedIter
with a wrapper that preserves closing: create a delegating
Closeable/AutoCloseable iterator that forwards hasNext/next to the mapped
projection (using unsafeProjection and JoinedRow with file.partitionValues) and
calls lanceIterator.close() when the wrapped iterator is exhausted or when
close() is invoked; ensure this wrapper is returned from the partitionSchema
branch so lanceIterator is always closed even when TaskContext.get() is null
(references: lanceIterator, projectedIter, unsafeProjection /
GenerateUnsafeProjection, JoinedRow, file.partitionValues,
SparkLanceReaderBase.read).
— CodeRabbit (original) (source:comment#3108852041)
There was a problem hiding this comment.
Good catch. The row path leaked the lanceIterator handle when TaskContext.get() was null (driver/direct reads). Fixed in c429fde by wrapping the projected iterator in an Iterator[InternalRow] with Closeable, mirroring the readBatch() pattern at line 212. LanceRecordIterator.close() is already idempotent, so this coexists safely with the TaskContext completion listener.
| case _ => | ||
| fail("Vectorized path with null-padding must return ColumnarBatch") | ||
| } | ||
| } |
There was a problem hiding this comment.
Close the batch iterators in these direct-reader tests.
Both tests open the vectorized iterator directly and never close it, even though the suite already has the correct try/finally pattern in readAndCollect(). On the driver there is no TaskContext cleanup, so these cases can leak Arrow allocators across the suite.
💡 Suggested change
val iter = reader.read(pf, requiredSchemaWithExtra, new StructType(), HOption.empty(),
Seq.empty[Filter], storageConf).asInstanceOf[Iterator[Any]]
- while (iter.hasNext) {
- iter.next() match {
- case batch: ColumnarBatch =>
- batchesSeen += 1
- batch.rowIterator().asScala.foreach { row =>
- nullPadded += row.isNullAt(2)
- }
- case _ =>
- fail("Vectorized path with null-padding must return ColumnarBatch")
+ try {
+ while (iter.hasNext) {
+ iter.next() match {
+ case batch: ColumnarBatch =>
+ batchesSeen += 1
+ batch.rowIterator().asScala.foreach { row =>
+ nullPadded += row.isNullAt(2)
+ }
+ case _ =>
+ fail("Vectorized path with null-padding must return ColumnarBatch")
+ }
}
+ } finally {
+ iter match {
+ case c: AutoCloseable => c.close()
+ case _ =>
+ }
}Apply the same pattern to the partition-vector test below as well.
Also applies to: 370-384
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala`
around lines 327 - 339, The direct vectorized read loop using reader.read
creates Arrow allocators that must be released — wrap the iterator consumption
in a try/finally and ensure any ColumnarBatch is explicitly closed after
processing (call batch.close() after iterating its rows) and close the iterator
itself if it implements AutoCloseable (or has a close method) in the finally
block; mirror the same try/finally + batch.close() changes for the second
direct-reader block (the one at lines ~370-384) so both tests follow the
readAndCollect() cleanup pattern and won't leak Arrow allocators.
— CodeRabbit (original) (source:comment#3108852049)
There was a problem hiding this comment.
Fixed in c429fde. Both direct-reader tests now wrap iterator consumption in try/finally and apply the same case c: AutoCloseable => c.close() pattern as the existing readAndCollect() helper (lines 180-183), so driver-side tests no longer leak Arrow allocators.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback! The row path now returns an Iterator[InternalRow] with Closeable with an idempotent close() so direct-reader callers get explicit cleanup, and both direct-reader tests wrap iteration in try/finally with an AutoCloseable close. Both of the prior hudi-agent findings (row-path close propagation in SparkLanceReaderBase.scala and closing batch iterators in TestLanceColumnarBatch.scala) are resolved by this round. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
- Idempotent close() in SparkLanceReaderBase.readBatch iterator: guard nullAllocator and partitionVectors against double-free when both the TaskContext listener and the outer CloseableIteratorListener invoke close(). - Close iterator in TestLanceColumnarBatch.runReadAndCollect: wrap the consume loop in try/finally and close via AutoCloseable; needed on the driver where TaskContext.get() is null and the completion-listener path never runs. - Assert vectorized scan path in 3 integration tests via a new helper that inspects SQLMetrics on a columnar FileSourceScan/BatchScan, descending into AdaptiveSparkPlanExec and QueryStageExec to reach the scan under AQE. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Handle TimestampNTZType in SparkLanceReaderBase.populatePartitionVectors: TIMESTAMP_NTZ is internally a Long (microseconds since epoch) just like TimestampType, but was falling through to the wildcard case and producing all-null partition values for any TIMESTAMP_NTZ-partitioned table on the vectorized path. - Replace per-batch O(n) linear scan `nullColumnVectors.find(_.colIndex == i)` with a direct-indexed `nullColumnByIndex: Array[NullColumnEntry]` lookup, removing m*k array scans for tables with m missing columns and k batches. - Add TestLanceColumnarBatch#testColumnarPathPartitionVectorTimestampNtz regression test that fails before the populatePartitionVectors fix because every row would carry a null partition value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Previously `closed = true` was set before cleanup, so if any single close() threw (null vector, null allocator, batch iterator, or partition vectors), the remaining closes were skipped and every subsequent close() call became a no-op, stranding Arrow/Lance resources. Attempt every close inside a closeSafely wrapper that captures the first exception and attaches the rest as suppressed; throw once all closes have been attempted. Idempotency via `closed = true` is preserved at the start so repeated close() calls from TaskCompletionListener + CloseableIteratorListener are still safe. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces per-element while loops in populatePartitionVectors for FloatType and DoubleType with the WritableColumnVector bulk-fill overloads, matching the style already used for IntegerType/DateType (putInts) and LongType/TimestampType/TimestampNTZType (putLongs). No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
readRows() in SparkLanceReaderBase returned a plain Scala Iterator, so lanceIterator.close() was unreachable whenever TaskContext.get() was null (driver / direct-reader callers). Wrap the projected stream in Iterator[InternalRow] with Closeable, matching readBatch()'s pattern and its idempotent closed flag. LanceRecordIterator.close() is itself idempotent, so calls from the TaskContext completion listener and from explicit close() coexist safely. Two direct-reader tests in TestLanceColumnarBatch consumed the iterator without the try/finally guard used in the readAndCollect() helper. Apply the same "case c: AutoCloseable => c.close()" pattern so driver-side tests do not leak Arrow allocators across the suite. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The master rebase pulled in apache#18498 which bumped lance to 4.0.0 and lance-spark to 0.4.0. Two API moves were not flagged by git's textual auto-merge but break compilation against the new classpath: * `LanceArrowColumnVector` moved from `org.apache.spark.sql.vectorized` to `org.lance.spark.vectorized`. Updated imports in LanceBatchIterator, SparkLanceReaderBase, HoodieFileGroupReaderBasedFileFormat, and the scaladoc references in TestLanceColumnarBatch. * `HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE` was retired in favor of the configurable `HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES` config property. The vectorized read path's null-vector allocator now reads its size from that config (mirroring how `read()` already sizes the data allocator after the master changes). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
c429fde to
47fa843
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds a vectorized columnar batch reading path for COW Lance tables, with careful resource lifecycle handling and a row-path fallback for type changes. A couple of edge cases around exception handling and partition type coverage worth double-checking in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming nits below; otherwise the code is clean and well-structured.
|
|
||
| try { | ||
| arrowReader.close(); | ||
| } catch (IOException e) { |
There was a problem hiding this comment.
🤖 Catch here is IOException while the lanceReader and allocator blocks below catch the broader Exception. If arrowReader.close() throws a RuntimeException (e.g., from Arrow allocator state — unclosed children, etc.), it propagates out of close() and we skip both lanceReader.close() and allocator.close(), leaking them. Was the narrower catch intentional, or worth widening to Exception for symmetry?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Widened to Exception in 104a52f. HoodieIOException's constructor only accepts IOException, so the rethrow now splits — HoodieIOException for the genuine IOException case, HoodieException otherwise.
|
|
||
| private ColumnVector[] columnVectors; | ||
| private ColumnarBatch currentBatch; | ||
| private boolean nextBatchLoaded = false; |
There was a problem hiding this comment.
🤖 nit: could you rename nextBatchLoaded to hasCachedBatch? The current name describes the mechanism (a batch was loaded from the reader), but what readers of hasNext() really need to know is whether a batch is already available to return — if (hasCachedBatch) return true reads much more naturally than if (nextBatchLoaded) return true.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| /** Holds a pre-created all-null Arrow vector for a column missing from the file (schema evolution). */ | ||
| private case class NullColumnEntry(colIndex: Int, columnVector: LanceArrowColumnVector, arrowVector: FieldVector) | ||
|
|
||
| // Batch size for reading Lance files (number of rows per batch) |
There was a problem hiding this comment.
🤖 nit: the columnVector field name is a bit generic alongside arrowVector — both are "vectors". Something like sparkVector (or lanceColumnVector) would make the Spark-facing wrapper vs. the raw Arrow backing vector distinction explicit without having to look at the types.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Renamed to lanceColumnVector in 104a52f (more specific than sparkVector — flags it as the Lance-Arrow wrapper rather than just any Spark vector).
There was a problem hiding this comment.
🤖 Line 556: For partition types not in the match (complex types like Struct/Array/Map, plus atomic types like CharType/VarcharType/interval types), this silently substitutes null. The row path preserves the value via JoinedRow, so this is a silent regression in batch mode for those types. Worth either throwing on unsupported types or filtering them in supportBatch (cf. Parquet's schema.forall(_.dataType.isInstanceOf[AtomicType]) gate)?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Added in 104a52f. Gate sits in SparkLanceReaderBase.read rather than supportBatch because the file-format hook only sees the data schema, not partitionSchema. isPartitionTypeSupportedForBatch covers exactly the set the populator handles (Boolean/Byte/Short/Int/Date/Long/Timestamp/TimestampNTZ/Float/Double/String/Decimal/Binary); when any partition type is unsupported, batch mode falls back to the row path where JoinedRow preserves the value. The previous silent-null case _ in populatePartitionVectors is now throw IllegalStateException, so any future regression in the gate fails loud instead of nulling user data.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the iterations on this — the resource lifecycle and close-ordering refinements over the prior rounds are looking solid. One concern worth confirming: the per-file row-mode fallback (type-change or unsupported partition types) may produce Iterator[InternalRow] while Spark expects ColumnarBatch from this reader, since supportBatch reports true for COW Lance. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. One minor simplification suggestion below; otherwise the code reads cleanly with good doc comments and clear separation between row and batch paths.
| val hasTypeChanges = !implicitTypeChangeInfo.isEmpty | ||
| val partitionTypesBatchSupported = | ||
| partitionSchema.forall(f => isPartitionTypeSupportedForBatch(f.dataType)) | ||
| if (enableVectorizedReader && !hasTypeChanges && partitionTypesBatchSupported) { |
There was a problem hiding this comment.
🤖 @yihua could you double-check this fallback's interaction with Spark's columnar codegen? After the PR, supportBatch returns true for COW Lance, so Spark wraps the scan to expect ColumnarBatch (ColumnarToRowExec calls batch.rowIterator() on each element). When hasTypeChanges=true (e.g., an old base file with FLOAT vs. a query with DOUBLE after schema evolution), this branch returns Iterator[InternalRow]. In a FileScanRDD partition that contains both a fallback file and a batch-mode file, the consumer would hit a ClassCastException on the row elements — the existing unit test (testTypeChangeFallsBackToRowPath) exercises SparkLanceReaderBase.read directly and wouldn't catch this. Same concern applies if partitionTypesBatchSupported=false for the query.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Fixed in fe8f7e1. Moved the gate to plan-time: lanceBatchSupported in HoodieFileGroupReaderBasedFileFormat.supportBatch now also checks !internalSchemaOpt.isPresent, so when the table carries an internal schema (the only way hasTypeChanges can fire) the planner will not commit to ColumnarBatch output. The runtime row branch in SparkLanceReaderBase.read is preserved for direct-reader callers — testTypeChangeFallsBackToRowPath continues to exercise it on the unit path.
| }.getOrElse(Array.empty) | ||
|
|
||
| // Direct-indexed lookup so the per-batch hot loop is O(1) instead of scanning nullColumnVectors. | ||
| val nullColumnByIndex: Array[NullColumnEntry] = { |
There was a problem hiding this comment.
🤖 nit: nullColumnVectors is built and then immediately re-bucketed into nullColumnByIndex — could you build the sparse nullColumnByIndex array directly in the nullAllocator.map { ... } block, and iterate it (filtering nulls) in close()? Would drop the duplicate array and the NullColumnEntry.colIndex field becomes unnecessary too.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Done in fe8f7e1. nullColumnByIndex is now built directly inside the nullAllocator.map { ... } block as a sparse Array[NullColumnEntry] of size requiredSchema.length, populated only at indexes where the column is missing from the file. nullColumnVectors is gone; NullColumnEntry.colIndex is gone too. close() walks the sparse array and skips null slots; the per-batch hot loop is unchanged.
…s honored Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for tackling the BLOB case! The new planning-time schemaContainsBlobColumn gate in supportBatch is a clean fix — it correctly prevents Spark from committing to ColumnarBatch output when the row-path-only BlobDescriptorTransform is needed, and the runtime descriptorBlobReadNeedsRowPath check provides nice defense-in-depth. One follow-up: the same iterator-type-mismatch logic applies to the hasTypeChanges runtime fallback as well — left a question inline about whether that branch needs a parallel planning-time gate. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here.
|
|
||
| /** | ||
| * Checks if the file format supports vectorized reading, please refer to SPARK-40918. | ||
| * |
There was a problem hiding this comment.
🤖 [Line 160] Nice fix for the BLOB case — the comment here ("so the planner doesn't commit to ColumnarBatch output that the row-path reader cannot deliver") describes exactly the same risk that exists for the hasTypeChanges runtime fallback in SparkLanceReaderBase.read. When a COW Lance table has implicit schema evolution, supportBatch still returns true here (no BLOB), but at runtime any file with type changes returns Iterator[InternalRow] while Spark already expects ColumnarBatch. Worth extending this gate (e.g. also disable when internalSchemaOpt.isPresent) to close that path too?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Done in fe8f7e1. lanceBatchSupported = !schemaContainsBlobColumn(schema) && !internalSchemaOpt.isPresent. The doc comment above is rewritten to call out both triggers (DESCRIPTOR blob mode and implicit type changes via internal-schema evolution) and explain that Spark’s ColumnarToRowExec would ClassCastException on the row-path iterator if either runtime fallback fired after the planner had committed to columnar output.
- Gate `supportBatch` off when `internalSchemaOpt.isPresent` so the
planner does not commit to ColumnarBatch output that the row-path
fallback (taken inside SparkLanceReaderBase.read on `hasTypeChanges`)
cannot deliver. Same shape as the existing BLOB-column gate; without
this, `ColumnarToRowExec` would ClassCastException on the row-path
iterator. Addresses review comments 3183992481 and 3186279309.
- Collapse `nullColumnVectors` and `nullColumnByIndex` into a single
sparse `Array[NullColumnEntry]` indexed by required-schema position,
built directly inside `nullAllocator.map { ... }`. Drop `colIndex`
from `NullColumnEntry` (direct indexing makes it redundant). The
per-batch hot loop is unchanged; close() walks the sparse array and
skips null slots. Addresses review comment 3183992489.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18403 +/- ##
============================================
- Coverage 68.08% 68.06% -0.02%
- Complexity 28940 28983 +43
============================================
Files 2519 2520 +1
Lines 140646 140908 +262
Branches 17427 17494 +67
============================================
+ Hits 95757 95913 +156
- Misses 37030 37100 +70
- Partials 7859 7895 +36
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds vectorized columnar batch reading for COW Lance tables, including a new LanceBatchIterator, plan-time/runtime gating between batch and row paths, and broad test coverage. One correctness question worth double-checking around the consistency between plan-time supportBatch and the runtime row-path fallback when implicit type changes are detected. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.
| partitionSchema.forall(f => isPartitionTypeSupportedForBatch(f.dataType)) | ||
| val descriptorBlobReadNeedsRowPath = | ||
| blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty | ||
| if (enableVectorizedReader && !hasTypeChanges && partitionTypesBatchSupported |
There was a problem hiding this comment.
🤖 Could this fall back to row mode after Spark already committed to columnar output? supportBatch only checks !internalSchemaOpt.isPresent, but hasTypeChanges can still be true at runtime when a file's schema differs in type from the required schema (e.g., older base files written before a widening reconciliation, or spark.read.schema(wider) on a table with narrower file types). In that case the planner expects ColumnarBatch, but readRows returns InternalRow → ClassCastException downstream. Worth confirming whether this can be reached via Hudi-managed Lance writes, and if so, gating batch mode more conservatively at plan time (or adding type-cast support to readBatch).
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
This is gated at plan time, not at runtime. HoodieFileGroupReaderBasedFileFormat.supportBatch disables batch mode for Lance whenever internalSchemaOpt.isPresent (HoodieFileGroupReaderBasedFileFormat.scala:188-189), and implicitTypeChangeInfo is itself derived from the same internalSchemaOpt. So when type changes exist, the reader is invoked with enableVectorizedReader=false and the planner already expects InternalRow — the row branch here is taken unconditionally.
The !hasTypeChanges check at line 174 is a defense-in-depth guard, not a reachable fallback under that contract. The block comment at HoodieFileGroupReaderBasedFileFormat.scala:179-187 documents the plan-time/runtime split explicitly.
Describe the issue this Pull Request addresses
Closes #17736
Summary and Changelog
Spark queries on COPY_ON_WRITE Lance tables now use columnar batch (vectorized) reading. Instead of decomposing Arrow batches into individual rows, Spark receives entire ColumnarBatch objects backed by zero-copy LanceArrowColumnVector wrappers. This eliminates per-row materialization overhead on the read path for COW base-file-only scans.
MOR tables, incremental queries are unaffected — they continue to use the row-based path, which is an existing Hudi constraint for all file formats (Parquet, ORC, Lance).
Limitations
supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch).Detailed changelog
LanceBatchIterator— NEW (hudi-client/hudi-spark-client)Iterator<ColumnarBatch> + Closeablethat reads Arrow batches from a Lance file and wraps each batch's field vectors inLanceArrowColumnVector[]to produce aColumnarBatch.VectorSchemaRoot).BufferAllocator, LanceFileReader, and ArrowReader; closes them in order on close().LanceRecordIterator.SparkLanceReaderBase— MODIFIED (hudi-spark-datasource/hudi-spark-common)read()method now branches intoreadBatch()(columnar) andreadRows()(row-based) based onenableVectorizedReaderand whether implicit type changes exist.readBatch():LanceBatchIteratorfor zero-copy batch iteration.LanceArrowColumnVector. This satisfies Spark's vectorTypes() contract which expects all data columns to beLanceArrowColumnVector. A dedicated child allocator (nullAllocator) manages these vectors and is closed before the data allocator.OnHeapColumnVectorarrays are filled with constant partition values viapopulatePartitionVectors(), which handles all Spark primitive types, strings, decimals, and binary. Vectors are reused across batches; re-populated only when batch size changes.implicitTypeChangeInfois non-empty (e.g., file has FLOAT, query requires DOUBLE), falls back toreadRows()which applies cast projections at the row level. Batch-level type casting is deferred to a follow-up.readRows(): the original row-based logic, extracted into its own method. Behavior unchanged.populatePartitionVectors(): new private helper, supports Boolean, Byte, Short, Int/Date, Long/Timestamp, Float, Double, String, Decimal (int/long/big), Binary. Unsupported types fall back to nulls.HoodieFileGroupReaderBasedFileFormat— MODIFIED (hudi-spark-datasource/hudi-spark-common)supportBatch(): changedval lanceBatchSupported = falsetoval lanceBatchSupported = true. The existing guardssupportVectorizedRead = !isIncremental && !isBootstrap && supportBatch and supportReturningBatch = !isMOR && supportVectorizedReadremain unchanged and apply to all formats.vectorTypes(): added a branch for LANCE format (when not in multi-format mode) returningLanceArrowColumnVectorclass name for all data columns andOnHeapColumnVectorfor partition columns. The existing Parquet/ORC logic is wrapped in the else branch, unchanged.readBaseFilecall sites (case _ branches): introducedbaseFileOnlyReaderto select the correct reader for direct base-file reads (no log merging). Parquet's vectorized reader can returnInternalRowwhenreturningBatch=false(MOR), so it continues to usebaseFileReaderfor performance. Lance's vectorized reader always returnsColumnarBatch, which causes aClassCastExceptionon MOR tables (wheresupportReturningBatch=falseand Spark expectsInternalRow), so it falls back tofileGroupBaseFileReader(non-vectorized). For COW tables,baseFileOnlyReader == baseFileReadersincefileGroupBaseFileReaderis not created separately.TestLanceColumnarBatch— NEW (hudi-spark-datasource/hudi-spark/src/test)SparkLanceReaderBasedirectly:testRowPathReturnsInternalRows— verifies enableVectorizedReader=false returns InternalRow, never ColumnarBatchtestColumnarPathReturnsBatches— verifies enableVectorizedReader=true returns ColumnarBatch with correct datatestColumnarPathNullPadsAbsentColumns— schema evolution: missing column is null-padded in batch modetestColumnarPathAppendsPartitionVectors— partition values appended as constant columns to each batchtestTypeChangeFallsBackToRowPath— implicit type change (FLOAT→DOUBLE) forces row path with correct cast valuesSpark DataFrame/SQL API:testCOWTableDataFrameRead— COW round-trip with vectorized reads activetestCOWTableSchemaEvolutionNullPadding— two bulk_inserts with schema widening; old files null-paddedtestCOWTableSparkSqlQuery— SELECT ... WHERE predicate evaluation on columnar batchesImpact
Public API / user-facing changes
Performance impact
Risk Level
low
Documentation Update
none
Contributor's checklist