Skip to content

[OSS PR #18403] feat(lance): Implement columnar batch reading for Lance (COW only)#38

Open
yihua wants to merge 6 commits into
masterfrom
oss-18403
Open

[OSS PR #18403] feat(lance): Implement columnar batch reading for Lance (COW only)#38
yihua wants to merge 6 commits into
masterfrom
oss-18403

Conversation

@yihua
Copy link
Copy Markdown
Owner

@yihua yihua commented Apr 14, 2026

Mirror of apache#18403 for automated bot review.

Original author: @wombatu-kun
Base branch: master

Summary by CodeRabbit

  • New Features

    • Clustering expiration with automatic rollback of expired clustering operations.
    • Global record-limit push-down for source reads (scan limit support).
    • Vectorized columnar-batch reading for Lance format.
  • Improvements

    • Column pruning for incremental reads to reduce I/O.
    • Broader instant-time normalization (ISO, epoch, and other formats).
    • Table-scoped distributed metrics registry and enhanced log-compaction awareness.
  • Validation

    • Rejected nested VECTOR types inside arrays/maps/structs.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 14, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a0585af9-0004-40a5-a963-939b59a5b536

📥 Commits

Reviewing files that changed from the base of the PR and between ff828d0 and a2bfd8e.

📒 Files selected for processing (1)
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala

📝 Walkthrough

Walkthrough

The PR adds heartbeat-based clustering instant expiration and rollback checks, centralizes wrapper-filesystem metrics registry creation, introduces distributed/table-scoped metric registries, adds log-compaction-aware compaction heuristics, implements Lance vectorized batch iteration, enforces VECTOR-schema nesting rules, adds incremental-read column pruning and instant-time normalization, refactors Flink CDC/RLI bootstrap and split/read plumbing (including record-limit pushdown), and includes many related tests.

Changes

Cohort / File(s) Summary
Base Hoodie Client
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
Removed no-op lifecycle hook protected void initWrapperFSMetrics() and its constructor invocation (unused metrics init removed).
Clustering Expiration
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java, hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java, hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
Added config flags and threshold for clustering expirations; added isClusteringInstantEligibleForRollback(...) using instant age + heartbeat; conditional heartbeat start/stop integration and tests for eligibility/rollback behavior.
Wrapper FS Metrics & Distributed Registry
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java, hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java, hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java, hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java, hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
Moved wrapper-FS registry initialization into a shared util; Spark client uses that util; added table-scoped compound-key registries, driver/executor registry wiring, and getName() on registries; tests added.
Log-Compaction-Aware Compaction
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java, hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java, hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
New CompactionUtils helpers to locate last log compaction and compute delta commits since latest completed log compaction; scheduler refactored to consider log-compaction-aware thresholds; tests added.
Lance Vectorized Reader
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala, hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala
Added Arrow->ColumnarBatch iterator with careful resource/close handling; adjusted record iterator lifecycle; introduced vectorized batch path in Spark Lance reader and related tests.
VECTOR Schema Nesting Validation
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala, hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java, hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala, hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestVectorDataSource.scala
Enforced that VECTOR types are allowed only as top-level record fields; array/map/record nesting now throws HoodieSchemaException; Spark conversion tracks nesting depth; tests added/updated.
Flink CDC Refactor & Reader Base
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/AbstractSplitReaderFunction.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
Extracted CDC iterators and image snapshot caching into reusable classes; added AbstractSplitReaderFunction consolidating config/schema helpers; CDC reader uses shared iterators/manager; large refactor and tests updated.
Flink RLI Bootstrap & Coordinator Changes
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java, hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
Removed coordination await payloads and operator-state coordination for RLI bootstrap; recommitInstant() now returns boolean and triggers global failover when recommit succeeds under RLI bootstrap; simplified bootstrap operator and removed related tests.
Flink Record Limit Push-down
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java, hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestRecordLimiter.java, hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
Added limit to scan context and RecordLimiter to globally cap rows across splits; split reader drains remaining splits as finished when limit reached; constructors updated and tests added.
Flink Split Assignment & Bucket Assigner
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java, hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java, hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
HoodieSplitBucketAssigner now accepts Flink Configuration, computes bucket/partition index functions and changes assignment logic to account for partition-derived bucket counts; tests updated for formula/invariants.
Spark Incremental Column Pruning
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala, hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala
Incremental relations implement PrunedScan and accept required columns; added utilities to compute pruned schema and post-read filtering; data-source options normalize incremental instants; tests added.
Instant-Time Parsing Normalization
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala, hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestInstantTimeValidation.scala
Broadened formatQueryInstant/added formatIncrementalInstant to accept ISO datetimes with 'T', epoch seconds/millis and sentinel/legacy forms; tightened validation and tests added.
Misc: Parquet Iterator & IO constants
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java, hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java, hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
Made ParquetReaderIterator idempotent/close-on-exhaustion; test expectation updated; added REGISTRY_NAME/REGISTRY_META_NAME constants to HoodieWrapperFileSystem.
Clustering Updates Strategy Config
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java, hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
UPDATES_STRATEGY removed hard default and now infers allow/reject class from lock-config; added enable-expirations and expiration-threshold config properties; tests added.
Flink Tests & Utilities
many test files under hudi-flink-datasource/hudi-flink/src/test/java/... and utilities like MockStateSnapshotContext, TestRecordLimiter, TestAbstractSplitReaderFunction, test harness helpers
Extensive test additions/updates for RLI bootstrap behaviors, event buffers, record limiter, split readers, abstract reader base, and test harness assertions; obsolete bootstrap-await tests removed.
Other Tests & Utilities
assorted tests in Spark/Hadoop modules (instant validation, vector tests, COW/MOR incremental tests, compaction tests)
Added/updated many unit and integration tests around instant formatting, vector rules, Lance columnar behavior, compaction utilities, and incremental query coverage.

Sequence Diagram(s)

sequenceDiagram
    participant Config as Config<br/>(Clustering)
    participant TableSvc as TableServiceClient
    participant Heartbeat as HoodieHeartbeatClient
    participant Meta as MetaClient
    participant Rollback as RollbackHandler

    TableSvc->>Config: isExpirationOfClusteringEnabled()
    Config-->>TableSvc: true
    TableSvc->>Heartbeat: start(instantTime)
    TableSvc->>Meta: scheduleClustering(instantTime)
    Note over Meta,Heartbeat: time passes
    TableSvc->>Config: getClusteringExpirationThresholdMins()
    Config-->>TableSvc: threshold
    TableSvc->>Meta: loadInstant(requestedTime)
    TableSvc->>Heartbeat: isHeartbeatExpired(requestedTime)
    Heartbeat-->>TableSvc: true/false
    TableSvc->>Rollback: isClusteringInstantEligibleForRollback(meta, instant, config, heartbeat)
    Rollback-->>TableSvc: eligible / not eligible
    alt eligible
        TableSvc->>Meta: rollback(instant)
        Meta-->>TableSvc: instantRemoved
        TableSvc->>Heartbeat: stop(instantTime)
    end
Loading
sequenceDiagram
    participant Driver as SparkDriver
    participant Engine as HoodieSparkEngineContext
    participant DriverReg as Registry (driver)
    participant Executor as SparkExecutor
    participant ExecReg as Registry (executor)
    participant User as TaskFunction

    Driver->>Engine: getMetricRegistry(table, name)
    Engine->>DriverReg: create distributed registry & register on driver
    Engine-->>Executor: serialized registry info via job
    Executor->>ExecReg: getMetricRegistry(table, name) (cached)
    Executor->>User: setRegistries(registries) before user map
    User->>ExecReg: increment/add metrics
    ExecReg-->>DriverReg: aggregated metrics available to driver
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Poem

🐰 I hopped through heartbeats, metrics in tow,
I nudged vector batches to brightly show,
I chased expired clusters where they slept,
I capped the records that rivers once swept,
Tests and iterators dance — what a row!

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch oss-18403

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 19

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java (1)

424-516: ⚠️ Potential issue | 🟠 Major

The nested-VECTOR guard is still incomplete.

createArray() / createMap() only reject a direct VECTOR element or value, and validateNoVectorInNestedRecord() only descends through child RECORDs. That still allows shapes like ARRAY<RECORD<embedding VECTOR>> and MAP<STRING, RECORD<embedding VECTOR>>, which contradicts the new “inside STRUCT, ARRAY, or MAP” restriction. I’d move this to one recursive schema walk over RECORD / ARRAY / MAP / UNION and reuse it from every construction path (create*, Builder.build(), and setFields()).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java`
around lines 424 - 516, The current nested-VECTOR checks are incomplete;
implement a single recursive validator (e.g.,
validateNoVectorInNestedSchema(HoodieSchema schema, String context)) that walks
RECORD, ARRAY, MAP and UNION/union branches and throws HoodieSchemaException if
a VECTOR is found at any nested (non-top-level) position, then replace the logic
in validateNoVectorInNestedRecord, createArray, createMap and call this new
validator from Builder.build() and setFields() so all construction paths reuse
the same recursive check (reference symbols: validateNoVectorInNestedRecord,
createArray, createMap, Builder.build, setFields).
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala (1)

160-174: ⚠️ Potential issue | 🟠 Major

supportBatch() advertises columnar Lance support but reader still falls back to rows for schema-evolved files.

Setting lanceBatchSupported = true here enables batch mode in the planner, but SparkLanceReaderBase.read() still returns readRows() whenever implicitTypeChangeInfo is non-empty. For schema-evolved Lance files, Spark commits to the columnar scan path based on the supportBatch() contract, making a per-file downgrade to row mode break the batch invariant.

Either gate lanceBatchSupported on the absence of implicit type changes, or ensure readBatch() handles schema padding and casts without fallback.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala`
around lines 160 - 174, The planner is being told Lance supports columnar reads
even when per-file schema evolution exists, causing SparkLanceReaderBase.read()
to fall back to readRows() for files with implicitTypeChangeInfo and violating
the batch contract; fix by gating lanceBatchSupported on absence of implicit
type changes (i.e., only set lanceBatchSupported = true when there is no
implicitTypeChangeInfo for the split) or alternatively implement proper schema
padding/casts in SparkLanceReaderBase.readBatch() so it can handle
implicitTypeChangeInfo without falling back to readRows(); update the logic
around lanceBatchSupported, supportBatch, and/or
SparkLanceReaderBase.read()/readBatch() to enforce a consistent columnar
contract (referencing lanceBatchSupported, supportBatch, HoodieFileFormat.LANCE,
SparkLanceReaderBase.read(), implicitTypeChangeInfo, readRows(), and
readBatch()).
🟡 Minor comments (8)
hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java-68-68 (1)

68-68: ⚠️ Potential issue | 🟡 Minor

Test assertion currently locks in duplicate-close behavior.

Line 68 should not require two close() invocations if cleanup is consistently routed through ParquetReaderIterator.close(). Once the production path is fixed, assert a single close to preserve the idempotency contract.

✅ Expected assertion after fixing iterator cleanup path
-    verify(reader, times(2)).close();
+    verify(reader, times(1)).close();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java`
at line 68, The test currently asserts verify(reader, times(2)).close() which
locks in duplicate-close behavior; update the test to assert a single close
(verify(reader, times(1)).close()) and ensure the production cleanup path
funnels resource closing through ParquetReaderIterator.close() (fix the iterator
cleanup in the code paths that create/own the reader so the iterator alone
performs close() once). Target the TestParquetReaderIterator test and the
ParquetReaderIterator.close() implementation and any factory/usage sites that
previously closed the reader directly so only the iterator closes it.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java-168-219 (1)

168-219: ⚠️ Potential issue | 🟡 Minor

These “limit” tests never exercise the limit path.

All four methods still invoke the same constructor shape as the pre-existing tests and only assert non-null, so they don't protect the new limit plumbing at all. Please hit the overload that actually accepts a limit, or assert record counts through read()/the returned iterator instead of duplicating constructor smoke tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java`
around lines 168 - 219, Tests claim to validate "limit" behavior but all four
still call the same 6-arg constructor and only assert non-null, so they never
exercise the limit path; update these tests to call the 7-arg
HoodieCdcSplitReaderFunction constructor that accepts an explicit limit value
(use positive, zero and negative cases as needed) and/or invoke function.read()
and iterate the returned iterator to assert the actual number of records
returned honors the provided limit (reference HoodieCdcSplitReaderFunction
constructors, the read() method and any limit-wrapping iterator logic) so the
tests verify functional behavior instead of only constructor success.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java-169-175 (1)

169-175: ⚠️ Potential issue | 🟡 Minor

Potential infinite loop if skipBytes returns 0.

DataInputStream.skipBytes() can return 0 when the stream reaches EOF or no bytes can be skipped, causing this loop to spin indefinitely.

Proposed fix to handle zero-skip case
 `@Override`
 public void skipBytesToRead(int numBytes) throws IOException {
   while (numBytes > 0) {
     int skipped = skipBytes(numBytes);
+    if (skipped <= 0) {
+      throw new IOException("Unable to skip " + numBytes + " remaining bytes");
+    }
     numBytes -= skipped;
   }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java`
around lines 169 - 175, The skipBytesToRead method in CdcImageManager can loop
indefinitely if skipBytes(numBytes) returns 0; update skipBytesToRead to detect
a zero return and handle it (e.g., throw an EOFException or IOException with a
clear message) instead of looping forever. Specifically, in
CdcImageManager.skipBytesToRead(int numBytes) check after each call to
skipBytes(numBytes) if skipped == 0 and then throw an IOException/EOFException
indicating unable to skip further or end of stream; otherwise subtract skipped
from numBytes and continue until zero.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java-334-338 (1)

334-338: ⚠️ Potential issue | 🟡 Minor

Closing shared imageManager may cause issues for sibling iterators.

DataLogFileIterator.close() invokes imageManager.close(), but the imageManager is also closed by CdcFileSplitsIterator.close() (line 139). If multiple DataLogFileIterator instances share the same imageManager, one iterator closing could break others still in use.

Suggested approach: let owning iterator manage lifecycle
 `@Override`
 public void close() {
   logRecordIterator.close();
-  imageManager.close();
+  // imageManager lifecycle managed by CdcFileSplitsIterator
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 334 - 338, DataLogFileIterator.close() currently closes the shared
imageManager which can break sibling iterators; change lifecycle so the owning
iterator manages imageManager: remove imageManager.close() from
DataLogFileIterator.close() (keep logRecordIterator.close()), and instead have
CdcFileSplitsIterator.close() be responsible for closing imageManager; if
ownership is unclear add an ownership flag (e.g., boolean ownsImageManager) to
DataLogFileIterator (set by its constructor) and only call imageManager.close()
when ownsImageManager is true, ensuring no other iterator closes a shared
imageManager prematurely.
hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java-136-140 (1)

136-140: ⚠️ Potential issue | 🟡 Minor

Use the full registry name when deciding whether to prepend commonPrefix.

With a table-scoped entry like table::fs, getRegistryNameFromKey(key) returns only fs, so this branch prepends commonPrefix even though the actual registry name is already table.fs. The same registry can therefore emit different metric names depending on how it was inserted into REGISTRY_MAP.

Possible fix
-        final String registryName = getRegistryNameFromKey(key);
+        final String registryName = registry.getName();
         final String prefix = (prefixWithRegistryName && commonPrefix.isPresent() && !registryName.contains("."))
             ? commonPrefix.get() + "." : "";
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java` around
lines 136 - 140, The code uses getRegistryNameFromKey(key) (registryName) to
decide whether to prepend commonPrefix, but that returns the short name (e.g.,
"fs") for table-scoped entries and causes inconsistent metric names; change the
check to use the full registry identifier from REGISTRY_MAP's key (or a helper
that returns the full registry name) when evaluating whether the registry
already contains a dot so prefixWithRegistryName and commonPrefix logic
consistently applies; update the branch that computes prefix (the logic around
registryName, prefixWithRegistryName, commonPrefix) so it checks the full
registry key and then call registry.getAllCounts(prefix...).put into allMetrics
as before.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java-279-285 (1)

279-285: ⚠️ Potential issue | 🟡 Minor

Validate hoodie.clustering.expiration.threshold.mins is non-negative.

hasInstantExpired() compares instant age against TimeUnit.MINUTES.toMillis(expirationMins). A negative value makes that guardrail pass immediately, so a fresh clustering instant can become rollback-eligible as soon as its heartbeat expires.

Suggested guard
 private void validate() {
+  long expirationThresholdMins = clusteringConfig.getLong(EXPIRATION_THRESHOLD_MINS);
+  ValidationUtils.checkArgument(expirationThresholdMins >= 0,
+      EXPIRATION_THRESHOLD_MINS.key() + " must be >= 0");
+
   boolean inlineCluster = clusteringConfig.getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
   boolean inlineClusterSchedule = clusteringConfig.getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING);
   ValidationUtils.checkArgument(!(inlineCluster && inlineClusterSchedule), String.format("Either of inline clustering (%s) or "
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java`
around lines 279 - 285, The EXPIRATION_THRESHOLD_MINS ConfigProperty can be set
negative which lets hasInstantExpired() treat fresh instants as expired; add an
explicit non-negative validation for EXPIRATION_THRESHOLD_MINS (e.g., during
config construction/validation in HoodieClusteringConfig or wherever the
property is read) that checks the configured long value is >= 0 and throws a
clear IllegalArgumentException/ConfigException if negative (include the property
name EXPIRATION_THRESHOLD_MINS in the message) so callers cannot proceed with a
negative expiration threshold.
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala-164-168 (1)

164-168: ⚠️ Potential issue | 🟡 Minor

The plan assertion can pass even when scan-time pruning is broken.

queryExecution.toString() will mention col1/col3 because of the Project node, so this stays green even if the underlying scan still reads every column. Please assert on the physical scan's ReadSchema (and ideally that unselected columns are absent there) so the test actually proves pruning.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala`
around lines 164 - 168, The current assertion inspects
queryExecution.toString(), which shows projection nodes and can mask scan-time
behavior; instead inspect the physical scan's read schema by using
prunedDF.queryExecution.executedPlan (or prunedDF.queryExecution.sparkPlan) and
assert that its string contains the physical scan's "ReadSchema" with the
expected selected columns (e.g., contains "ReadSchema" and "col1" or "col3") and
does NOT contain the unselected columns (e.g., "col2", "col4"); update the
assertion to check prunedDF.queryExecution.executedPlan.toString() (or locate
the FileScan/HadoopFsRelation node in executedPlan) and verify presence/absence
of the appropriate column names so the test actually proves column pruning.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java-486-489 (1)

486-489: ⚠️ Potential issue | 🟡 Minor

Make assertGlobalFailure() eventually consistent.

Bootstrap handling runs on the coordinator executor, so isJobFailed() can still be false right after assertNextEvent() hands the event off. This one-shot assertion makes the new failover test flaky; poll here or add a helper that waits for the coordinator task to finish first.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java`
around lines 486 - 489, The current assertGlobalFailure method does a one-shot
check of pipeline.getCoordinatorContext().isJobFailed(), which is flaky because
coordinator tasks may not have set the failure yet; change assertGlobalFailure
in TestWriteBase to poll (with a short sleep) until either isJobFailed() becomes
the expected value or a reasonable timeout elapses (failing the assertion on
timeout), or alternatively call a helper that waits for the coordinator task to
finish before asserting; reference the method assertGlobalFailure and the call
to pipeline.getCoordinatorContext().isJobFailed() and ensure this new
polling/wait logic is used after assertNextEvent() in the failover tests to make
the check eventually consistent.
🧹 Nitpick comments (8)
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java (1)

34-35: Consider making partitionIndexFunc final for immutability.

The partitionIndexFunc field is assigned only in the constructor and never modified. For consistency with numBucketsFunction and to prevent accidental reassignment, it should be declared final.

Suggested fix
 public class HoodieSplitBucketAssigner implements HoodieSplitAssigner {
   private final NumBucketsFunction numBucketsFunction;
-  private Functions.Function3<Integer, String, Integer, Integer> partitionIndexFunc;
+  private final Functions.Function3<Integer, String, Integer, Integer> partitionIndexFunc;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java`
around lines 34 - 35, The field partitionIndexFunc is only assigned in the
HoodieSplitBucketAssigner constructor but left mutable; make it final to match
numBucketsFunction and enforce immutability by changing the declaration of
partitionIndexFunc to be final (i.e., declare "private final
Functions.Function3<Integer, String, Integer, Integer> partitionIndexFunc") and
keep the existing constructor assignment in the HoodieSplitBucketAssigner
constructor as-is; no other logic changes required.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java (1)

389-445: This block duplicates existing constructor coverage.

The null/valid constructor cases are already covered above, and this block still doesn't exercise the limit handoff mentioned in the header. Replacing it with a focused HoodieSourceSplitReader interaction test would give more signal for the new behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java`
around lines 389 - 445, The tests duplicate constructor coverage for
HoodieSplitReaderFunction; remove this redundant block and replace it with a
focused interaction test that exercises the "limit handoff" between
HoodieSourceSplitReader and HoodieSplitReaderFunction: create a
HoodieSourceSplitReader (or the existing reader factory) with a non-zero limit,
register a mock split or HoodieSourceSplit, invoke the method that hands off
splits to HoodieSplitReaderFunction (use the class/methods around
HoodieSourceSplitReader and HoodieSplitReaderFunction), and assert that the
handed-off split count respects the configured limit and that
HoodieSplitReaderFunction receives the expected number of splits; keep
null/constructor tests removed and ensure you reference HoodieSourceSplitReader,
HoodieSplitReaderFunction, and the handoff method names to locate where to add
the new interaction assertions.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java (2)

587-593: Same redundant setRowKind pattern.

As noted above, getImageRecord already applies the row kind, making line 591 redundant.

Proposed cleanup
 `@Override`
 protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
   String recordKey = cdcRecord.get(1).toString();
   RowData row = imageManager.getImageRecord(recordKey, beforeImages, rowKind);
-  row.setRowKind(rowKind);
   return projection.project(row);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 587 - 593, The getBeforeImage method currently calls
imageManager.getImageRecord(recordKey, beforeImages, rowKind) which already
applies the RowKind, so remove the redundant row.setRowKind(rowKind) call in
getBeforeImage; simply fetch the row from getImageRecord and return
projection.project(row) (references: getBeforeImage, getImageRecord, setRowKind,
beforeImages, projection.project).

541-547: Redundant setRowKind call.

imageManager.getImageRecord(recordKey, afterImages, rowKind) already sets the row kind (see CdcImageManager line 120). The subsequent row.setRowKind(rowKind) on line 545 is redundant.

Proposed cleanup
 `@Override`
 protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
   String recordKey = cdcRecord.get(1).toString();
   RowData row = imageManager.getImageRecord(recordKey, afterImages, rowKind);
-  row.setRowKind(rowKind);
   return projection.project(row);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 541 - 547, In getAfterImage (class CdcIterators) the call to
row.setRowKind(rowKind) is redundant because
imageManager.getImageRecord(recordKey, afterImages, rowKind) already sets the
RowKind (see CdcImageManager where it assigns the kind). Remove the extra
row.setRowKind(rowKind) line and keep returning projection.project(row)
unchanged so behavior and projection remain the same.
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java (1)

336-352: Throwing on first ineligible instant may be overly strict.

The current implementation throws HoodieException immediately upon encountering the first ineligible clustering instant (line 339). This prevents processing any remaining instants that might be eligible for rollback.

Consider whether the intended behavior is:

  1. Fail-fast (current): Stop immediately if any instant targeting requested partitions has an active heartbeat
  2. Best-effort: Skip ineligible instants, continue processing, and log/return skipped instants

If fail-fast is intentional (e.g., to prevent partial rollbacks), consider documenting this behavior in the method's Javadoc.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java`
around lines 336 - 352, The current fail-fast throw in HoodieClusteringJob when
encountering an ineligible instant (call to
BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback) should be
changed to a best-effort behavior: instead of throwing, log a warning
identifying the instant (use instant.requestedTime()) and the reason,
record/collect skipped instants if needed, then continue processing remaining
instants returned by getPendingClusteringInstantsForPartitions; keep the
existing metaClient.reloadActiveTimeline() and the containsInstant check and
client.rollback(instant.requestedTime()) for eligible instants. Update any
method Javadoc to reflect the new best-effort semantics or add a TODO config
flag if fail-fast must be optionally preserved.
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala (1)

2605-2617: Assert which _hoodie_partition_path value wins.

The test proves the duplicate-field merge no longer throws, but it never checks the value of the colliding column itself. If the read starts returning the wrong side of that collision, this still passes. Please assert the expected _hoodie_partition_path values explicitly so the precedence is locked in.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala`
around lines 2605 - 2617, The test currently verifies keys and "data" but not
which `_hoodie_partition_path` wins; update the assertions after the
incrementalDf.select(...) / results variable to explicitly assert the expected
`_hoodie_partition_path` for each row (e.g., add assertEquals(expected0,
results(0).getAs[String]("_hoodie_partition_path")), assertEquals(expected1,
results(1).getAs[String]("_hoodie_partition_path")), assertEquals(expected2,
results(2).getAs[String]("_hoodie_partition_path"))), using the correct expected
strings based on the test setup so the precedence for the colliding column is
locked in (locate this near the existing assertions in TestCOWDataSource).
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java (1)

1049-1073: Add one positive nullable top-level VECTOR case if that shape is meant to stay valid.

The new nullable coverage only proves UNION(null, VECTOR) is rejected inside ARRAY/MAP. It does not protect the common optional-column shape where the nullable VECTOR is itself a top-level field, so an over-eager validator could break optional embeddings while these tests still pass.

Also applies to: 2857-2870

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java`
around lines 1049 - 1073, Add a new positive test that ensures a nullable
top-level VECTOR field is accepted: in TestHoodieSchema add a case (alongside
testVectorAsTopLevelRecordField) that constructs a HoodieSchema.Vector (e.g.,
HoodieSchema.createVector(128, HoodieSchema.Vector.VectorElementType.FLOAT)),
wraps it in a nullable union schema (UNION(NULL, VECTOR)) as a top-level record
field (use HoodieSchemaField.of("embedding", /* nullable vector */)), build the
record with HoodieSchema.createRecord, serialize/parse via
toString()/HoodieSchema.parse, and assert equality and vector properties using
assertVector; this ensures the validator allows top-level optional VECTOR fields
(also add the same positive test for the other location referenced around the
second range).
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java (1)

353-364: Drive the real reset path in the failover harness.

This helper still simulates recovery via subtaskFailed(...) plus manual bootstrap replay, while the production logic added in this PR lives in StreamWriteOperatorCoordinator.subtaskReset(...). Because the original BucketAssignFunction instance also stays alive here, these tests can pass without exercising the actual reset callback or restored bucket-assigner state. Recreate/reinitialize the bucket assigner and invoke the real reset flow in this harness.

Also applies to: 428-447

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java`
around lines 353 - 364, The test helper subTaskFails currently simulates
recovery by calling coordinator.subtaskFailed(...) and manually replaying
bootstrap but leaves the original BucketAssignFunction instance alive; update
subTaskFails (and the similar block at 428-447) to recreate/reinitialize the
bucket assigner and drive the actual reset path by invoking the real
StreamWriteOperatorCoordinator.subtaskReset flow instead of only subtaskFailed;
specifically, after calling coordinator.subtaskFailed(taskID, ...), dispose or
null out the existing BucketAssignFunction and bucketAssignFunctionContext,
reinstantiate them (so setupWriteFunction()/setupIndexWriteFunction use a fresh
assigner), and trigger the coordinator.reset/subtaskReset logic that the
production operator uses to restore assigner state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`:
- Around line 739-742: Heartbeat is started unconditionally when a plan is
scheduled (option.isPresent() && config.isExpirationOfClusteringEnabled()) via
heartbeatClient.start(instantTime) but is only stopped on successful
completeClustering(), so failures or split scheduling/execution (cluster(),
schedule/... async paths) can leave the heartbeat running forever; fix by
ensuring heartbeatClient.stop(...) is called on all non-success code paths: wrap
the execution path (the call that leads to cluster()/completeClustering()) in
try/finally to stop the heartbeat on exception, and for split scheduling flows,
either defer heartbeatClient.start until execution begins or add a compensating
stop when scheduling returns without execution (e.g., in the scheduling branch
that only creates a plan), and ensure completeClustering() still stops the
heartbeat on success.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java`:
- Around line 243-255: The method needLogCompact currently compares delta-commit
counts to config.getLogCompactionBlocksThreshold(), which is a log-block limit;
change it to compute the actual number of appended log blocks since the last
compaction/log compaction and compare that to getLogCompactionBlocksThreshold().
In needLogCompact, replace numDeltaCommitsSince with a computed
numLogBlocksSince (e.g., iterate relevant delta commits or file-slices since
latest compaction/log compaction using existing timeline/commit metadata and sum
the appended log blocks per commit/file-slice) and use that value for the
threshold check and log message; alternatively if you cannot access block counts
here, update the config usage to a commit-count threshold accessor
(rename/getCommitCountThreshold) and use that instead, but do not compare commit
counts to a block threshold. Ensure you reference needLogCompact,
getLatestDeltaCommitInfoSinceLogCompaction(), and
config.getLogCompactionBlocksThreshold() when making the change.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`:
- Around line 149-154: The distributed registries are only set before the
mapToPair/flatMapToPair closures but not inside reducer/foreach closures, so
reducer and foreach user functions run without registry setup; update
HoodieSparkEngineContext to capture DISTRIBUTED_REGISTRY_MAP into a final
Map<String, Registry> registries and wrap the reduceFunc and any foreach/user
closures (e.g., the reduceByKey(reduceFunc::apply) invocation and foreach calls)
so they first call setRegistries(registries) inside the executor-side lambda
before delegating to reduceFunc/foreachFunc; apply the same pattern for the
other affected blocks (the ones around lines 161-168 and 291-297) and for
flatMapToPair-based flows to ensure all executor-side user functions call
setRegistries(registries) before executing.
- Around line 92-96: DISTRIBUTED_REGISTRY_MAP is static and caches
DistributedRegistry entries across Spark contexts causing accumulators to remain
bound to the wrong/defunct context; update getMetricRegistry() to key registries
by Spark context identity (e.g., include the Spark application id /
SparkContext.applicationId or JavaSparkContext instance id in the cache key such
as tableName + "." + registryName + "." + sparkAppId) so computeIfAbsent
re-registers per application, or alternatively make DISTRIBUTED_REGISTRY_MAP
non-static (instance-scoped) on HoodieSparkEngineContext so each context has its
own registry map; locate usages of DISTRIBUTED_REGISTRY_MAP,
getMetricRegistry(), and DistributedRegistry and apply one of these fixes to
ensure accumulators are registered against the current JavaSparkContext.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java`:
- Around line 81-82: The static METRICS_REGISTRY_DATA/META in
HoodieWrapperFileSystem causes per-table registries to be overwritten when
multiple SparkRDDWriteClient instances run in the same JVM; update the registry
storage to be table-isolated by keying registries by a unique table identifier
(e.g., tableName/tablePath/tableUUID) and change
DistributedRegistryUtil.createWrapperFileSystemRegistries to register under that
key instead of the single static pair, then modify HoodieWrapperFileSystem to
look up/insert registries in a ConcurrentHashMap keyed by that table identifier
(and expose a remove/cleanup method to unregister on client close); ensure
SparkRDDWriteClient constructs/passes the same table identifier when calling
createWrapperFileSystemRegistries so each client gets its own registry
namespace.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java`:
- Line 63: The field limit in HoodieScanContext should use Lombok's
`@Builder.Default` and be initialized to -1L so omitted builder calls don't
default to 0; update the declaration of the field limit (private final long
limit) to add `@Builder.Default` and set it to -1L (the sentinel for "no limit")
so existing HoodieScanContext.builder() call sites keep the intended behavior.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java`:
- Around line 46-52: The constructor currently instantiates a new RecordLimiter
per HoodieSourceReader (see HoodieSourceReader -> HoodieSourceSplitReader call
using Option.of(new RecordLimiter(scanContext.getLimit()))), which allows each
parallel reader to enforce the limit independently; instead, create a single
shared RecordLimiter when a non-NO_LIMIT is configured (e.g., in the enumerator
or reader factory) and pass that same RecordLimiter instance (wrapped in Option)
into all HoodieSourceReader and HoodieSourceSplitReader instances via updated
constructor parameters, replacing the per-reader new RecordLimiter(...) usage;
update HoodieSourceReader and HoodieSourceSplitReader signatures to accept
Optional/Option<RecordLimiter> and modify tests to include a multi-reader
scenario to validate global limiting.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java`:
- Around line 61-68: The current nextRecordFromSplit/wrap logic treats NO_LIMIT
(-1) as exhausted and splits the check and increment, allowing races; change
nextRecordFromSplit (and the wrap path) to first handle NO_LIMIT: if limit ==
NO_LIMIT then bypass the reservation and just call
records.nextRecordFromSplit(); otherwise atomically reserve a slot before
returning a record by using totalReadCount in an atomic
compare-and-set/get-and-update loop (or AtomicLong#updateAndGet) so you only
call records.nextRecordFromSplit when you successfully increment totalReadCount
< limit (reserve one slot), and return null when limit is reached; update
references to totalReadCount, limit, NO_LIMIT, nextRecordFromSplit and wrap so
both honor NO_LIMIT and perform an atomic reserve-before-read to enforce strict
limit semantics.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`:
- Line 313: mergeRowWithLog(historyRecord, record) can return an empty Option so
calling .get() unconditionally risks a NoSuchElementException; update the
CdcIterators code around the line that assigns HoodieRecord<RowData> merged =
mergeRowWithLog(historyRecord, record).get() to first check the Option result
(e.g., isPresent()/isEmpty() or use orElse/ifPresent pattern) and handle the
empty case explicitly (skip processing this record, log a debug/warn, or throw a
clearer exception). Ensure you reference the Option returned by
mergeRowWithLog(...) and only call .get() after confirming it contains a value,
so downstream code that uses merged is safe.
- Around line 161-173: The hasNext() method in CdcIterators currently advances
the underlying iterator (nested.next()) and stores it in currentRecord, which
breaks the iterator contract when hasNext() is called multiple times; change
hasNext() to be idempotent by implementing a lookahead cache (e.g., a private
field like nextRecordPresent/nextRecord) that only calls nested.next() the first
time hasNext() discovers a next element, sets its RowKind to INSERT there, and
returns true; then have next() return and clear that cached nextRecord (or if
cache empty, call nested.next() normally), ensuring currentRecord is only
consumed by next() and not overwritten by repeated hasNext() calls, and handle
end-of-iteration correctly.

In
`@hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java`:
- Around line 58-61: The EOF path currently calls close() when next==null but
later the broad catch in next() directly calls
FileIOUtils.closeQuietly(parquetReader) and may catch/re-wrap the locally-thrown
end-of-data HoodieException, causing a double/ungarded close and bypassing the
closed flag; modify next() so that any cleanup always routes through the class
close() method (not directly FileIOUtils.closeQuietly), and update the exception
handling to rethrow the original end-of-data HoodieException without wrapping it
(or let it propagate) while only catching non-EOF exceptions to perform close()
and wrap as needed; refer to the next() method, close(), parquetReader,
FileIOUtils.closeQuietly(...) and HoodieException to locate and change the
logic.

In `@hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java`:
- Around line 152-155: setRegistries is inserting registries under makeKey("",
registry.getName()) which yields keys like "::tbl.fs" and mismatches
getRegistryOfClass's "tbl::fs" keys; change the key construction to use the
registry's scope/table instead of the empty string (e.g.,
makeKey(registry.getScope() or registry.getTableName(), registry.getName())) so
REGISTRY_MAP.putIfAbsent uses the same key format as getRegistryOfClass and
avoids creating duplicate table-scoped registries.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala`:
- Around line 308-310: The current code prunes to requiredColumns via
IncrementalRelationUtil.filterRequiredColumnsFromDF before applying incremental
filters (filters), which drops columns referenced by PUSH_DOWN_INCR_FILTERS;
change IncrementalRelationV1.scala so the filters are applied to scanDf before
performing the final projection (i.e., call
filters.foldLeft(scanDf)((e,f)=>e.filter(f)).rdd first), or alternatively expand
requiredColumns to include any column names referenced by filters prior to
calling IncrementalRelationUtil.filterRequiredColumnsFromDF; update the logic
around scanDf, requiredColumns, filters and metaClient accordingly and add a
regression test for select(...) combined with PUSH_DOWN_INCR_FILTERS to verify
behavior.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala`:
- Around line 292-293: The code prunes columns via
IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns,
metaClient) before applying option-level incremental filters, which can drop
columns referenced by PUSH_DOWN_INCR_FILTERS and break incremental reads; fix by
either (A) applying the option filters first (move
filters.foldLeft(scanDf)((e,f)=>e.filter(f)) before calling
filterRequiredColumnsFromDF) or (B) ensure requiredColumns is augmented with any
columns referenced by the incremental option filters (read the
PUSH_DOWN_INCR_FILTERS expression from the relation options and add its
referenced column names to requiredColumns) so filterRequiredColumnsFromDF
called on scanDf will not drop needed filter-only columns (references:
IncrementalRelationV2, filterRequiredColumnsFromDF, filters.foldLeft(...),
PUSH_DOWN_INCR_FILTERS, requiredColumns, metaClient).

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala`:
- Around line 38-73: The code appends fields to prunedSchema in three separate
passes (using requiredColumns, partitionColumns from
metaClient.getTableConfig.getPartitionFields, and orderingFields from
getOrderingFields) but only checks against requiredColumns, allowing duplicate
field names to be added; fix it by maintaining a mutable Set[String] (e.g.,
addedNames) initialized with requiredColumns, update it whenever you add a field
to prunedSchema, and in each place before calling prunedSchema.add(field.get)
check addedNames.contains(field.name) to skip duplicates (apply to the
requiredColumns loop, the partitionColumns loop, and the orderingFields loop,
including the HoodieTableType.MERGE_ON_READ branch).

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala`:
- Around line 250-262: Add an idempotent guard to the close path installed by
readBatch(): modify the close() implementation of the mapped iterator returned
by readBatch() (the method that currently calls
nullColumnVectors.foreach(_.columnVector.close()),
nullAllocator.foreach(_.close()), batchIterator.close(),
partitionVectors.foreach(_.close())) to be no-op on subsequent calls by using a
private AtomicBoolean/volatile boolean closed flag checked-and-set (or
synchronized check) at the top of close(); ensure the task-completion listener
that calls mappedIterator.close() uses the same guarded close so
nullAllocator.close() and partitionVectors.foreach(_.close()) cannot run twice.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala`:
- Around line 279-299: The yyyy-MM-dd parsing branch in formatQueryInstant (the
else-if for instantLength == 10 && !isAllDigits(queryInstant)) must enforce
strict date validation and handle parse errors: obtain the SimpleDateFormat from
defaultDateFormat.get(), call fmt.setLenient(false) before parsing, then wrap
fmt.parse(queryInstant) in a try/catch catching java.text.ParseException and
throw an IllegalArgumentException with a clear message (include the original
queryInstant and SUPPORTED_FORMATS_MSG) to match the method's error handling;
keep the rest of the branch using TimelineUtils.formatDate(parsedDate).

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala`:
- Around line 395-425: Add an assertion in each integration test (e.g.,
testCOWTableDataFrameRead, testCOWTableSchemaEvolutionNullPadding,
testCOWTableSparkSqlQuery) that inspects the executed physical plan to ensure
the vectorized scan was used: use
actual.queryExecution.executedPlan.collectFirst to find a plan node whose class
simple name indicates a Lance columnar/batch scan (or pattern-match to
SparkLanceReaderBase) and assert the node exists, and if you can access the
reader instance (SparkLanceReaderBase) assert its batchCount > 0 to prove
columnar batches were produced.
- Around line 157-174: The iterator returned by reader.read(...) (the one cast
to Iterator[Any]) must be closed to release Arrow allocators and file readers;
wrap the creation and consumption of that iterator in a try/finally and call
iter.close() in the finally block (and still handle ColumnarBatch in the match
as before), i.e. replace the direct while(iter.hasNext)... loop with a try {
while(iter.hasNext) { ... } } finally { if (iter != null) iter.close() } around
the reader.read(...) result.

---

Outside diff comments:
In `@hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java`:
- Around line 424-516: The current nested-VECTOR checks are incomplete;
implement a single recursive validator (e.g.,
validateNoVectorInNestedSchema(HoodieSchema schema, String context)) that walks
RECORD, ARRAY, MAP and UNION/union branches and throws HoodieSchemaException if
a VECTOR is found at any nested (non-top-level) position, then replace the logic
in validateNoVectorInNestedRecord, createArray, createMap and call this new
validator from Builder.build() and setFields() so all construction paths reuse
the same recursive check (reference symbols: validateNoVectorInNestedRecord,
createArray, createMap, Builder.build, setFields).

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala`:
- Around line 160-174: The planner is being told Lance supports columnar reads
even when per-file schema evolution exists, causing SparkLanceReaderBase.read()
to fall back to readRows() for files with implicitTypeChangeInfo and violating
the batch contract; fix by gating lanceBatchSupported on absence of implicit
type changes (i.e., only set lanceBatchSupported = true when there is no
implicitTypeChangeInfo for the split) or alternatively implement proper schema
padding/casts in SparkLanceReaderBase.readBatch() so it can handle
implicitTypeChangeInfo without falling back to readRows(); update the logic
around lanceBatchSupported, supportBatch, and/or
SparkLanceReaderBase.read()/readBatch() to enforce a consistent columnar
contract (referencing lanceBatchSupported, supportBatch, HoodieFileFormat.LANCE,
SparkLanceReaderBase.read(), implicitTypeChangeInfo, readRows(), and
readBatch()).

---

Minor comments:
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java`:
- Around line 279-285: The EXPIRATION_THRESHOLD_MINS ConfigProperty can be set
negative which lets hasInstantExpired() treat fresh instants as expired; add an
explicit non-negative validation for EXPIRATION_THRESHOLD_MINS (e.g., during
config construction/validation in HoodieClusteringConfig or wherever the
property is read) that checks the configured long value is >= 0 and throws a
clear IllegalArgumentException/ConfigException if negative (include the property
name EXPIRATION_THRESHOLD_MINS in the message) so callers cannot proceed with a
negative expiration threshold.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java`:
- Around line 169-175: The skipBytesToRead method in CdcImageManager can loop
indefinitely if skipBytes(numBytes) returns 0; update skipBytesToRead to detect
a zero return and handle it (e.g., throw an EOFException or IOException with a
clear message) instead of looping forever. Specifically, in
CdcImageManager.skipBytesToRead(int numBytes) check after each call to
skipBytes(numBytes) if skipped == 0 and then throw an IOException/EOFException
indicating unable to skip further or end of stream; otherwise subtract skipped
from numBytes and continue until zero.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`:
- Around line 334-338: DataLogFileIterator.close() currently closes the shared
imageManager which can break sibling iterators; change lifecycle so the owning
iterator manages imageManager: remove imageManager.close() from
DataLogFileIterator.close() (keep logRecordIterator.close()), and instead have
CdcFileSplitsIterator.close() be responsible for closing imageManager; if
ownership is unclear add an ownership flag (e.g., boolean ownsImageManager) to
DataLogFileIterator (set by its constructor) and only call imageManager.close()
when ownsImageManager is true, ensuring no other iterator closes a shared
imageManager prematurely.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java`:
- Around line 486-489: The current assertGlobalFailure method does a one-shot
check of pipeline.getCoordinatorContext().isJobFailed(), which is flaky because
coordinator tasks may not have set the failure yet; change assertGlobalFailure
in TestWriteBase to poll (with a short sleep) until either isJobFailed() becomes
the expected value or a reasonable timeout elapses (failing the assertion on
timeout), or alternatively call a helper that waits for the coordinator task to
finish before asserting; reference the method assertGlobalFailure and the call
to pipeline.getCoordinatorContext().isJobFailed() and ensure this new
polling/wait logic is used after assertNextEvent() in the failover tests to make
the check eventually consistent.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java`:
- Around line 168-219: Tests claim to validate "limit" behavior but all four
still call the same 6-arg constructor and only assert non-null, so they never
exercise the limit path; update these tests to call the 7-arg
HoodieCdcSplitReaderFunction constructor that accepts an explicit limit value
(use positive, zero and negative cases as needed) and/or invoke function.read()
and iterate the returned iterator to assert the actual number of records
returned honors the provided limit (reference HoodieCdcSplitReaderFunction
constructors, the read() method and any limit-wrapping iterator logic) so the
tests verify functional behavior instead of only constructor success.

In
`@hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java`:
- Line 68: The test currently asserts verify(reader, times(2)).close() which
locks in duplicate-close behavior; update the test to assert a single close
(verify(reader, times(1)).close()) and ensure the production cleanup path
funnels resource closing through ParquetReaderIterator.close() (fix the iterator
cleanup in the code paths that create/own the reader so the iterator alone
performs close() once). Target the TestParquetReaderIterator test and the
ParquetReaderIterator.close() implementation and any factory/usage sites that
previously closed the reader directly so only the iterator closes it.

In `@hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java`:
- Around line 136-140: The code uses getRegistryNameFromKey(key) (registryName)
to decide whether to prepend commonPrefix, but that returns the short name
(e.g., "fs") for table-scoped entries and causes inconsistent metric names;
change the check to use the full registry identifier from REGISTRY_MAP's key (or
a helper that returns the full registry name) when evaluating whether the
registry already contains a dot so prefixWithRegistryName and commonPrefix logic
consistently applies; update the branch that computes prefix (the logic around
registryName, prefixWithRegistryName, commonPrefix) so it checks the full
registry key and then call registry.getAllCounts(prefix...).put into allMetrics
as before.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala`:
- Around line 164-168: The current assertion inspects queryExecution.toString(),
which shows projection nodes and can mask scan-time behavior; instead inspect
the physical scan's read schema by using prunedDF.queryExecution.executedPlan
(or prunedDF.queryExecution.sparkPlan) and assert that its string contains the
physical scan's "ReadSchema" with the expected selected columns (e.g., contains
"ReadSchema" and "col1" or "col3") and does NOT contain the unselected columns
(e.g., "col2", "col4"); update the assertion to check
prunedDF.queryExecution.executedPlan.toString() (or locate the
FileScan/HadoopFsRelation node in executedPlan) and verify presence/absence of
the appropriate column names so the test actually proves column pruning.

---

Nitpick comments:
In
`@hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java`:
- Around line 1049-1073: Add a new positive test that ensures a nullable
top-level VECTOR field is accepted: in TestHoodieSchema add a case (alongside
testVectorAsTopLevelRecordField) that constructs a HoodieSchema.Vector (e.g.,
HoodieSchema.createVector(128, HoodieSchema.Vector.VectorElementType.FLOAT)),
wraps it in a nullable union schema (UNION(NULL, VECTOR)) as a top-level record
field (use HoodieSchemaField.of("embedding", /* nullable vector */)), build the
record with HoodieSchema.createRecord, serialize/parse via
toString()/HoodieSchema.parse, and assert equality and vector properties using
assertVector; this ensures the validator allows top-level optional VECTOR fields
(also add the same positive test for the other location referenced around the
second range).

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java`:
- Around line 34-35: The field partitionIndexFunc is only assigned in the
HoodieSplitBucketAssigner constructor but left mutable; make it final to match
numBucketsFunction and enforce immutability by changing the declaration of
partitionIndexFunc to be final (i.e., declare "private final
Functions.Function3<Integer, String, Integer, Integer> partitionIndexFunc") and
keep the existing constructor assignment in the HoodieSplitBucketAssigner
constructor as-is; no other logic changes required.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`:
- Around line 587-593: The getBeforeImage method currently calls
imageManager.getImageRecord(recordKey, beforeImages, rowKind) which already
applies the RowKind, so remove the redundant row.setRowKind(rowKind) call in
getBeforeImage; simply fetch the row from getImageRecord and return
projection.project(row) (references: getBeforeImage, getImageRecord, setRowKind,
beforeImages, projection.project).
- Around line 541-547: In getAfterImage (class CdcIterators) the call to
row.setRowKind(rowKind) is redundant because
imageManager.getImageRecord(recordKey, afterImages, rowKind) already sets the
RowKind (see CdcImageManager where it assigns the kind). Remove the extra
row.setRowKind(rowKind) line and keep returning projection.project(row)
unchanged so behavior and projection remain the same.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java`:
- Around line 353-364: The test helper subTaskFails currently simulates recovery
by calling coordinator.subtaskFailed(...) and manually replaying bootstrap but
leaves the original BucketAssignFunction instance alive; update subTaskFails
(and the similar block at 428-447) to recreate/reinitialize the bucket assigner
and drive the actual reset path by invoking the real
StreamWriteOperatorCoordinator.subtaskReset flow instead of only subtaskFailed;
specifically, after calling coordinator.subtaskFailed(taskID, ...), dispose or
null out the existing BucketAssignFunction and bucketAssignFunctionContext,
reinstantiate them (so setupWriteFunction()/setupIndexWriteFunction use a fresh
assigner), and trigger the coordinator.reset/subtaskReset logic that the
production operator uses to restore assigner state.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java`:
- Around line 389-445: The tests duplicate constructor coverage for
HoodieSplitReaderFunction; remove this redundant block and replace it with a
focused interaction test that exercises the "limit handoff" between
HoodieSourceSplitReader and HoodieSplitReaderFunction: create a
HoodieSourceSplitReader (or the existing reader factory) with a non-zero limit,
register a mock split or HoodieSourceSplit, invoke the method that hands off
splits to HoodieSplitReaderFunction (use the class/methods around
HoodieSourceSplitReader and HoodieSplitReaderFunction), and assert that the
handed-off split count respects the configured limit and that
HoodieSplitReaderFunction receives the expected number of splits; keep
null/constructor tests removed and ensure you reference HoodieSourceSplitReader,
HoodieSplitReaderFunction, and the handoff method names to locate where to add
the new interaction assertions.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala`:
- Around line 2605-2617: The test currently verifies keys and "data" but not
which `_hoodie_partition_path` wins; update the assertions after the
incrementalDf.select(...) / results variable to explicitly assert the expected
`_hoodie_partition_path` for each row (e.g., add assertEquals(expected0,
results(0).getAs[String]("_hoodie_partition_path")), assertEquals(expected1,
results(1).getAs[String]("_hoodie_partition_path")), assertEquals(expected2,
results(2).getAs[String]("_hoodie_partition_path"))), using the correct expected
strings based on the test setup so the precedence for the colliding column is
locked in (locate this near the existing assertions in TestCOWDataSource).

In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java`:
- Around line 336-352: The current fail-fast throw in HoodieClusteringJob when
encountering an ineligible instant (call to
BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback) should be
changed to a best-effort behavior: instead of throwing, log a warning
identifying the instant (use instant.requestedTime()) and the reason,
record/collect skipped instants if needed, then continue processing remaining
instants returned by getPendingClusteringInstantsForPartitions; keep the
existing metaClient.reloadActiveTimeline() and the containsInstant check and
client.rollback(instant.requestedTime()) for eligible instants. Update any
method Javadoc to reflect the new best-effort semantics or add a TODO config
flag if fail-fast must be optionally preserved.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

Comment on lines +739 to +742
if (option.isPresent() && config.isExpirationOfClusteringEnabled()) {
heartbeatClient.start(instantTime);
log.info("Started heartbeat for clustering instant {}", instantTime);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This heartbeat can outlive failed or split clustering flows.

The heartbeat starts as soon as a plan is scheduled, but in this file it is only stopped after successful completeClustering(). If cluster() throws, or scheduling/execution are split (schedule.inline, async executor, table-service manager), the scheduling client can keep heartbeating indefinitely and the stale replacecommit never becomes rollback-eligible.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 739 - 742, Heartbeat is started unconditionally when a plan is
scheduled (option.isPresent() && config.isExpirationOfClusteringEnabled()) via
heartbeatClient.start(instantTime) but is only stopped on successful
completeClustering(), so failures or split scheduling/execution (cluster(),
schedule/... async paths) can leave the heartbeat running forever; fix by
ensuring heartbeatClient.stop(...) is called on all non-success code paths: wrap
the execution path (the call that leads to cluster()/completeClustering()) in
try/finally to stop the heartbeat on exception, and for split scheduling flows,
either defer heartbeatClient.start until execution begins or add a compensating
stop when scheduling returns without execution (e.g., in the scheduling branch
that only creates a plan), and ensure completeClustering() still stops the
heartbeat on success.

Comment on lines +243 to +255
private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) {
Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction();
int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft();
int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent()
? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
: 0;

int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction);
boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold();
if (shouldLogCompact) {
log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince);
}
return shouldLogCompact;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t compare a block-threshold config to delta-commit counts.

hoodie.log.compaction.blocks.threshold is a log-block limit, but Lines 245-251 compare it to a table-level delta-commit count. One delta commit can append multiple log blocks or none, so this can both delay and prematurely trigger log compaction. Use actual log-block counts here, or rename this to a commit-count threshold.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java`
around lines 243 - 255, The method needLogCompact currently compares
delta-commit counts to config.getLogCompactionBlocksThreshold(), which is a
log-block limit; change it to compute the actual number of appended log blocks
since the last compaction/log compaction and compare that to
getLogCompactionBlocksThreshold(). In needLogCompact, replace
numDeltaCommitsSince with a computed numLogBlocksSince (e.g., iterate relevant
delta commits or file-slices since latest compaction/log compaction using
existing timeline/commit metadata and sum the appended log blocks per
commit/file-slice) and use that value for the threshold check and log message;
alternatively if you cannot access block counts here, update the config usage to
a commit-count threshold accessor (rename/getCommitCountThreshold) and use that
instead, but do not compare commit counts to a block threshold. Ensure you
reference needLogCompact, getLatestDeltaCommitInfoSinceLogCompaction(), and
config.getLogCompactionBlocksThreshold() when making the change.

Comment on lines +92 to +96
/**
* Map of all distributed registries created via getMetricRegistry().
* This map is passed to Spark executors to make the registries available there.
*/
private static final Map<String, Registry> DISTRIBUTED_REGISTRY_MAP = new ConcurrentHashMap<>();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

fd -t f 'HoodieSparkEngineContext.java' --type f

Repository: yihua/hudi

Length of output: 265


🏁 Script executed:

cat -n hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java | head -100

Repository: yihua/hudi

Length of output: 4891


🏁 Script executed:

sed -n '280,295p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java

Repository: yihua/hudi

Length of output: 775


🏁 Script executed:

sed -n '98,110p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java

Repository: yihua/hudi

Length of output: 493


🏁 Script executed:

rg -n "class DistributedRegistry" --type java

Repository: yihua/hudi

Length of output: 379


🏁 Script executed:

cat -n hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java

Repository: yihua/hudi

Length of output: 3936


🏁 Script executed:

rg -n "DISTRIBUTED_REGISTRY_MAP" hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java

Repository: yihua/hudi

Length of output: 609


🏁 Script executed:

rg -n "isRegistered|close|cleanup|destroy" hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java

Repository: yihua/hudi

Length of output: 83


🏁 Script executed:

rg -n "getMetricRegistry" hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestHoodieSparkEngineContext.java

Repository: yihua/hudi

Length of output: 36


🏁 Script executed:

sed -n '135,200p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java

Repository: yihua/hudi

Length of output: 3122


🏁 Script executed:

rg -n "isRegistered" hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java -A 3

Repository: yihua/hudi

Length of output: 133


🏁 Script executed:

cat -n hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java | sed -n '287,295p'

Repository: yihua/hudi

Length of output: 388


🏁 Script executed:

rg -n "new HoodieSparkEngineContext\|getMetricRegistry" hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestHoodieSparkEngineContext.java -A 2

Repository: yihua/hudi

Length of output: 36


🏁 Script executed:

find . -name "*.java" -path "*/test/*" -exec grep -l "DISTRIBUTED_REGISTRY_MAP\|DistributedRegistry" {} \;

Repository: yihua/hudi

Length of output: 152


🏁 Script executed:

cat -n hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java

Repository: yihua/hudi

Length of output: 10028


🏁 Script executed:

sed -n '290,310p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java

Repository: yihua/hudi

Length of output: 974


Avoid process-wide registry caching here.

DISTRIBUTED_REGISTRY_MAP is static and its key does not include the Spark application or context identity. When getMetricRegistry() is called with the same tableName.registryName across different JavaSparkContext instances, computeIfAbsent returns the cached DistributedRegistry that was registered with the first context, without re-registering it with the current context. This leaves the accumulator bound to a defunct or unrelated Spark context.

Include the Spark application ID (or context identity) in the cache key, or make the registry map instance-scoped rather than static.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`
around lines 92 - 96, DISTRIBUTED_REGISTRY_MAP is static and caches
DistributedRegistry entries across Spark contexts causing accumulators to remain
bound to the wrong/defunct context; update getMetricRegistry() to key registries
by Spark context identity (e.g., include the Spark application id /
SparkContext.applicationId or JavaSparkContext instance id in the cache key such
as tableName + "." + registryName + "." + sparkAppId) so computeIfAbsent
re-registers per application, or alternatively make DISTRIBUTED_REGISTRY_MAP
non-static (instance-scoped) on HoodieSparkEngineContext so each context has its
own registry map; locate usages of DISTRIBUTED_REGISTRY_MAP,
getMetricRegistry(), and DistributedRegistry and apply one of these fixes to
ensure accumulators are registered against the current JavaSparkContext.

Comment on lines +149 to 154
final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> {
setRegistries(registries);
Pair<K, V> pair = mapToPairFunc.call(input);
return new Tuple2<>(pair.getLeft(), pair.getRight());
}).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The reduce-side user function still runs without registry setup.

setRegistries(...) is called before mapToPairFunc / flatMapToPairFunc, but the subsequent reduceByKey(reduceFunc::apply) executes reduceFunc on executors without that setup. Any reducer that touches Registry will still miss the distributed registries; the same gap remains in reduceByKey(...) and foreach(...).

Also applies to: 161-168, 291-297

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`
around lines 149 - 154, The distributed registries are only set before the
mapToPair/flatMapToPair closures but not inside reducer/foreach closures, so
reducer and foreach user functions run without registry setup; update
HoodieSparkEngineContext to capture DISTRIBUTED_REGISTRY_MAP into a final
Map<String, Registry> registries and wrap the reduceFunc and any foreach/user
closures (e.g., the reduceByKey(reduceFunc::apply) invocation and foreach calls)
so they first call setRegistries(registries) inside the executor-side lambda
before delegating to reduceFunc/foreachFunc; apply the same pattern for the
other affected blocks (the ones around lines 161-168 and 291-297) and for
flatMapToPair-based flows to ensure all executor-side user functions call
setRegistries(registries) before executing.

Comment on lines +81 to 82
DistributedRegistryUtil.createWrapperFileSystemRegistries(context, writeConfig);
this.tableServiceClient = new SparkRDDTableServiceClient<T>(context, writeConfig, getTimelineServer());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This still is not table-isolated in a shared JVM.

createWrapperFileSystemRegistries(...) installs per-table registries, but HoodieWrapperFileSystem still stores them in one static METRICS_REGISTRY_DATA/META pair. Creating a second client overwrites the first client’s wrapper-FS registries, so metrics can be attributed to the wrong table under multi-tenant or concurrent use.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java`
around lines 81 - 82, The static METRICS_REGISTRY_DATA/META in
HoodieWrapperFileSystem causes per-table registries to be overwritten when
multiple SparkRDDWriteClient instances run in the same JVM; update the registry
storage to be table-isolated by keying registries by a unique table identifier
(e.g., tableName/tablePath/tableUUID) and change
DistributedRegistryUtil.createWrapperFileSystemRegistries to register under that
key instead of the single static pair, then modify HoodieWrapperFileSystem to
look up/insert registries in a ConcurrentHashMap keyed by that table identifier
(and expose a remove/cleanup method to unregister on client close); ensure
SparkRDDWriteClient constructs/passes the same table identifier when calling
createWrapperFileSystemRegistries so each client gets its own registry
namespace.

Comment on lines +38 to +73
// Add all the required columns as part of pruned schema
requiredColumns.foreach(col => {
val field = usedSchema.find(_.name == col)
if (field.isDefined) {
prunedSchema = prunedSchema.add(field.get)
}
})

// All the partition fields are required columns while querying the data.
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
if (partitionColumns.isPresent) {
partitionColumns.get().foreach(col => {
if (!requiredColumns.contains(col)) {
val field = usedSchema.find(_.name == col)
if (field.isDefined) {
prunedSchema = prunedSchema.add(field.get)
}
}
})
}

// The precombine/ordering field is required for merge logic in MOR tables
if (tableConfig.getTableType == HoodieTableType.MERGE_ON_READ) {
val orderingFields = tableConfig.getOrderingFields
if (!orderingFields.isEmpty) {
orderingFields.forEach(col => {
if (!requiredColumns.contains(col)) {
val field = usedSchema.find(_.name == col)
if (field.isDefined) {
prunedSchema = prunedSchema.add(field.get)
}
}
})
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Prevent duplicate fields in the pruned scan schema.

These three passes append fields independently, but only guard against names already present in requiredColumns. If an MOR ordering field is also a partition field, StructType.add will append the same name twice and the incremental scan can later fail with duplicate/ambiguous column analysis. Track already-added names across the whole method, not just against requiredColumns.

Possible fix
   def getPrunedSchema(requiredColumns: Array[String],
                       usedSchema: StructType,
                       metaClient: HoodieTableMetaClient) = {
     var prunedSchema = StructType(Seq())
+    val added = scala.collection.mutable.HashSet[String]()
+
+    def addIfPresent(name: String): Unit = {
+      usedSchema.find(_.name == name).foreach { field =>
+        if (added.add(field.name)) {
+          prunedSchema = prunedSchema.add(field)
+        }
+      }
+    }
 
     // _hoodie_commit_time is a required field. using which query filters are applied.
     if (!requiredColumns.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
-      prunedSchema = prunedSchema.add(usedSchema(HoodieRecord.COMMIT_TIME_METADATA_FIELD))
+      addIfPresent(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
     }
 
     // Add all the required columns as part of pruned schema
     requiredColumns.foreach(col => {
-      val field = usedSchema.find(_.name == col)
-      if (field.isDefined) {
-        prunedSchema = prunedSchema.add(field.get)
-      }
+      addIfPresent(col)
     })
 
     // All the partition fields are required columns while querying the data.
     val tableConfig = metaClient.getTableConfig
     val partitionColumns = tableConfig.getPartitionFields
     if (partitionColumns.isPresent) {
       partitionColumns.get().foreach(col => {
         if (!requiredColumns.contains(col)) {
-          val field = usedSchema.find(_.name == col)
-          if (field.isDefined) {
-            prunedSchema = prunedSchema.add(field.get)
-          }
+          addIfPresent(col)
         }
       })
     }
 
     // The precombine/ordering field is required for merge logic in MOR tables
     if (tableConfig.getTableType == HoodieTableType.MERGE_ON_READ) {
       val orderingFields = tableConfig.getOrderingFields
       if (!orderingFields.isEmpty) {
         orderingFields.forEach(col => {
           if (!requiredColumns.contains(col)) {
-            val field = usedSchema.find(_.name == col)
-            if (field.isDefined) {
-              prunedSchema = prunedSchema.add(field.get)
-            }
+            addIfPresent(col)
           }
         })
       }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala`
around lines 38 - 73, The code appends fields to prunedSchema in three separate
passes (using requiredColumns, partitionColumns from
metaClient.getTableConfig.getPartitionFields, and orderingFields from
getOrderingFields) but only checks against requiredColumns, allowing duplicate
field names to be added; fix it by maintaining a mutable Set[String] (e.g.,
addedNames) initialized with requiredColumns, update it whenever you add a field
to prunedSchema, and in each place before calling prunedSchema.add(field.get)
check addedNames.contains(field.name) to skip duplicates (apply to the
requiredColumns loop, the partitionColumns loop, and the orderingFields loop,
including the HoodieTableType.MERGE_ON_READ branch).

Comment on lines 279 to +299
def formatQueryInstant(queryInstant: String): String = {
val instantLength = queryInstant.length
if (instantLength == 19 || instantLength == 23) {
// Handle "yyyy-MM-dd HH:mm:ss[.SSS]" format
if (instantLength >= 19 && instantLength <= 23 && queryInstant.contains("T")) {
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant.replace('T', ' '))
} else if (instantLength == 19 || instantLength == 23) {
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant)
} else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
|| instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) {
// Handle already serialized "yyyyMMddHHmmss[SSS]" format
|| instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) {
validateInstant(queryInstant)
queryInstant
} else if (instantLength == 10) { // for yyyy-MM-dd
} else if (instantLength == 10 && !isAllDigits(queryInstant)) {
TimelineUtils.formatDate(defaultDateFormat.get().parse(queryInstant))
} else if (instantLength == 10 && isAllDigits(queryInstant)) {
TimelineUtils.formatDate(new java.util.Date(queryInstant.toLong * 1000L))
} else if (instantLength == 13 && isAllDigits(queryInstant)) {
TimelineUtils.formatDate(new java.util.Date(queryInstant.toLong))
} else {
throw new IllegalArgumentException(
s"Unsupported query instant time format: $queryInstant. $SUPPORTED_FORMATS_MSG")
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's locate and examine the file in question
find . -name "HoodieSqlCommonUtils.scala" -type f

Repository: yihua/hudi

Length of output: 163


🏁 Script executed:

# Once we find it, let's look at the defaultDateFormat initialization and the formatQueryInstant method
cd . && find . -name "HoodieSqlCommonUtils.scala" -type f -exec head -300 {} \; | tail -100

Repository: yihua/hudi

Length of output: 4414


🏁 Script executed:

# Let's search more specifically for the defaultDateFormat and the full method
rg -A 20 "private val defaultDateFormat" --type scala

Repository: yihua/hudi

Length of output: 3379


🏁 Script executed:

# Let's see the complete formatQueryInstant method and surrounding context
rg -B 5 -A 25 "def formatQueryInstant" --type scala

Repository: yihua/hudi

Length of output: 5022


🏁 Script executed:

# Check if HoodieInstantTimeGenerator has exception handling
rg "HoodieInstantTimeGenerator" -A 5 --type scala

Repository: yihua/hudi

Length of output: 17943


🏁 Script executed:

# Check how TimelineUtils.formatDate is implemented
rg "def formatDate" --type scala -A 10

Repository: yihua/hudi

Length of output: 36


🏁 Script executed:

# Let's search for ParseException handling in this file
rg "ParseException" --type scala

Repository: yihua/hudi

Length of output: 50366


🏁 Script executed:

# Let's check the entire file structure around defaultDateFormat and formatQueryInstant to understand the context better
cat hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala | grep -n "defaultDateFormat\|formatQueryInstant\|ParseException" | head -20

Repository: yihua/hudi

Length of output: 364


Make yyyy-MM-dd parsing strict and add exception handling.

The defaultDateFormat uses a lenient SimpleDateFormat by default, so an input like 2025-13-40 will silently roll forward to a different date instead of failing. This undermines validation and can point incremental/time-travel reads at the wrong instant. Additionally, this branch lacks exception handling—if ParseException is thrown, it propagates unwrapped, inconsistent with the rest of the method.

Set fmt.setLenient(false) and wrap the parse call in try-catch to convert ParseException to IllegalArgumentException for consistency.

Possible fix
   private val defaultDateFormat =
   ThreadLocal.withInitial(new java.util.function.Supplier[SimpleDateFormat] {
-    override def get() = new SimpleDateFormat("yyyy-MM-dd")
+    override def get() = {
+      val fmt = new SimpleDateFormat("yyyy-MM-dd")
+      fmt.setLenient(false)
+      fmt
+    }
   })
@@
     } else if (instantLength == 10 && !isAllDigits(queryInstant)) {
-      TimelineUtils.formatDate(defaultDateFormat.get().parse(queryInstant))
+      try {
+        TimelineUtils.formatDate(defaultDateFormat.get().parse(queryInstant))
+      } catch {
+        case _: java.text.ParseException =>
+          throw new IllegalArgumentException(
+            s"Unsupported query instant time format: $queryInstant. $SUPPORTED_FORMATS_MSG")
+      }
     } else if (instantLength == 10 && isAllDigits(queryInstant)) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala`
around lines 279 - 299, The yyyy-MM-dd parsing branch in formatQueryInstant (the
else-if for instantLength == 10 && !isAllDigits(queryInstant)) must enforce
strict date validation and handle parse errors: obtain the SimpleDateFormat from
defaultDateFormat.get(), call fmt.setLenient(false) before parsing, then wrap
fmt.parse(queryInstant) in a try/catch catching java.text.ParseException and
throw an IllegalArgumentException with a clear message (include the original
queryInstant and SUPPORTED_FORMATS_MSG) to match the method's error handling;
keep the rest of the branch using TimelineUtils.formatDate(parsedDate).

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Apr 14, 2026

Greptile Summary

This PR implements vectorized columnar batch reading for Lance format (COW, base-file-only path) in Spark, alongside several supporting improvements: column pruning for incremental reads (IncrementalRelationV1/V2PrunedScan), broader instant-time normalization (ISO T-separator, epoch seconds/millis), a global row-count limiter for Flink sources, clustering expiration with heartbeat-based rollback, log-compaction awareness in the compaction scheduler, and a table-scoped distributed metrics registry.

Key changes and findings:

  • Lance columnar batch path (SparkLanceReaderBase, LanceBatchIterator, HoodieFileGroupReaderBasedFileFormat): The implementation is solid. Previously flagged issues — TimestampNTZType unhandled in populatePartitionVectors and O(n) linear scan for null-column lookup — are both resolved in this revision. Resource lifecycle ordering (null allocator → data allocator via batchIterator.close()) is correct and close() is idempotent.

  • Incremental column pruning (IncrementalRelationV1/V2, IncrementalRelationUtil): Both relations upgrade from TableScan to PrunedScan. The pruned schema correctly ensures _hoodie_commit_time is always available for filtering and is dropped from the result when not requested.

  • Distributed metrics registry (Registry, HoodieSparkEngineContext, DistributedRegistry): The key-format mismatch between setRegistries (plants under \"::tableName.registryName\") and getRegistryOfClass (looks up under \"tableName::registryName\") remains unresolved from a prior review thread, meaning executor-side metrics still do not accumulate in the driver's DistributedRegistry accumulator.

  • Clustering expiration (BaseHoodieTableServiceClient, HoodieClusteringJob): hasInstantExpired applies a timezone conversion that is immediately reversed (no-op). rollbackFailedClusteringForPartitions still throws on the first ineligible instant rather than skipping it (previously flagged). needLogCompact sentinel value (0) issue previously flagged remains.

  • VECTOR validation (HoodieSchema, HoodieSparkSchemaConverters): Correctly rejects VECTOR types nested inside RECORD, ARRAY, or MAP at both the Avro schema and Spark schema levels.

Confidence Score: 3/5

The core Lance columnar batch feature is well-implemented, but three previously flagged issues (registry key mismatch, rollback-throws-instead-of-skips, needLogCompact sentinel) remain unresolved, and a new timezone no-op is introduced.

Six issues were flagged in prior review threads. Three are now resolved in this revision (TimestampNTZType partition vector, O(n) null-column scan, RecordLimiter guard in callers). Three remain open: the Registry key-format mismatch (executor metrics silently lost), rollbackFailedClusteringForPartitions throwing on the first ineligible instant (blocks subsequent rollbacks), and the needLogCompact sentinel 0 default. A new P2 style issue (timezone no-op in hasInstantExpired) is introduced. Score reflects meaningful convergence on the Lance feature itself but persistent unresolved issues in the clustering-expiration and metrics subsystems.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java (hasInstantExpired timezone no-op, rollback-throws), hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java (rollback-throws), hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java (key mismatch, executor metrics lost), hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java (needLogCompact sentinel)

Important Files Changed

Filename Overview
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java New columnar batch iterator for Lance files; lifecycle management (allocator → arrowReader → lanceReader) is well-ordered and close() is idempotent with suppressed-exception chaining.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala Adds columnar batch read path (readBatch) alongside existing row-based path (readRows); null-column lookup is now O(1) via direct array, and TimestampNTZType is handled in populatePartitionVectors.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java Adds clustering expiration / heartbeat-based rollback; hasInstantExpired converts Instant→ZonedDateTime→Instant making the timezone a no-op; rollbackFailedClusteringForPartitions still throws on first non-eligible instant.
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java Adds rollbackFailedClusteringForPartitions; throws HoodieException on first ineligible instant rather than skipping, blocking all subsequent rollbacks (previously flagged, still unresolved).
hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java Adds table-scoped registry keys and setRegistries; key format used by setRegistries (::tableName.registryName) still mismatches getRegistryOfClass lookup key (tableName::registryName), so executor metrics never reach the driver DistributedRegistry.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java Introduces needLogCompact logic; when getLatestDeltaCommitInfoSinceLogCompaction returns empty (pending log compaction in flight), the 0 sentinel causes Math.min to always return 0, making shouldLogCompact always false (previously flagged).
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala Upgrades from TableScan to PrunedScan to enable column pruning for incremental reads; contains a duplicate/misplaced log message after filterRequiredColumnsFromDF.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala New utility for pruned schema construction and column filtering; correctly preserves _hoodie_commit_time for filter application and strips it afterwards.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java New row-count limiter; wrap() still has a latent bug when called with NO_LIMIT (0L >= -1L == true), but callers guard this with Option.empty().
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java Adds distributed metric registry management; DISTRIBUTED_REGISTRY_MAP is static (shared across all tables in the JVM) and executor-side setRegistries key format still mismatches driver-side lookup.

Sequence Diagram

sequenceDiagram
    participant Spark as Spark Executor
    participant HFGR as HoodieFileGroupReaderBasedFileFormat
    participant SLRB as SparkLanceReaderBase
    participant LBI as LanceBatchIterator
    participant LFR as LanceFileReader
    participant AR as ArrowReader

    Spark->>HFGR: buildReaderWithPartitionValues(file, requiredSchema)
    HFGR->>SLRB: readFile(file, requiredSchema, partitionSchema)
    SLRB->>LFR: open(filePath, allocator)
    LFR-->>SLRB: lanceReader
    SLRB->>AR: lanceReader.readAll(columnNames, null, batchSize=512)
    AR-->>SLRB: arrowReader
    alt enableVectorizedReader and no type changes
        SLRB->>LBI: new LanceBatchIterator(allocator, lanceReader, arrowReader)
        SLRB-->>Spark: Iterator[ColumnarBatch] cast as Iterator[InternalRow]
        loop each batch
            Spark->>LBI: hasNext()
            LBI->>AR: loadNextBatch()
            AR-->>LBI: VectorSchemaRoot reused
            LBI-->>Spark: true
            Spark->>LBI: next()
            LBI-->>Spark: ColumnarBatch
            Note over Spark,LBI: mappedIterator reorders columns, substitutes null Arrow vectors for missing columns, appends constant partition vectors
        end
        Spark->>LBI: close() via TaskContext listener
        LBI->>AR: close()
        LBI->>LFR: close()
        LBI->>LBI: allocator.close()
    else row-based path MOR or type changes
        SLRB-->>Spark: Iterator[InternalRow] via LanceRecordIterator
    end
Loading

Reviews (3): Last reviewed commit: "fix(lance): address PR #18403 review rou..." | Re-trigger Greptile

Comment on lines +55 to +62
@Override
public String nextSplit() {
return records.nextSplit();
}

@Override
public T nextRecordFromSplit() {
if (totalReadCount.get() >= limit) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 wrap() is broken when limit == NO_LIMIT

When limit = NO_LIMIT = -1L, the condition totalReadCount.get() >= limit evaluates to 0L >= -1L = true on the very first call, so nextRecordFromSplit() immediately returns null — emitting zero records from every wrapped split.

The current call-site in HoodieSourceReader guards against this with an explicit Option.empty() path, so the bug is dormant today. But nothing prevents a future caller from constructing new RecordLimiter(NO_LIMIT) and calling wrap(), silently dropping all data.

Suggested change
@Override
public String nextSplit() {
return records.nextSplit();
}
@Override
public T nextRecordFromSplit() {
if (totalReadCount.get() >= limit) {
@Override
public T nextRecordFromSplit() {
if (limit != NO_LIMIT && totalReadCount.get() >= limit) {
return null;
}
T record = records.nextRecordFromSplit();
if (record != null) {
totalReadCount.incrementAndGet();
}
return record;
}

Comment on lines 147 to +153
}

/**
* Set all registries if they are not already registered.
*/
static void setRegistries(Collection<Registry> registries) {
for (Registry registry : registries) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 setRegistries key format mismatches getRegistryOfClass

setRegistries stores each registry under makeKey("", registry.getName()), which expands to "::tableName.registryName" (a double-colon key with an empty table component).

But getRegistryOfClass(tableName, registryName, clazz) always looks up with makeKey(tableName, registryName) = "tableName::registryName".

These two key formats are never equal when tableName is non-empty. When executor-side code subsequently calls getRegistryOfClass("tableName", "registryName", ...) to retrieve the distributed registry planted by setRegistries, the lookup misses and a brand-new LocalRegistry is created instead. Executor-side metrics are then never accumulated back to the driver's DistributedRegistry accumulator.

The root cause is that DistributedRegistry.getName() returns the full prefixed name ("tableName.registryName") while setRegistries naively treats that as the registryName component. The fix should ensure both the storage key (in setRegistries) and the lookup key (in getRegistryOfClass) use the same format.

Comment on lines +317 to +328
return metaClient.getActiveTimeline().filterInflightsAndRequested()
.getInstantsAsStream()
.filter(instant -> matchingInstantTimes.contains(instant.requestedTime()))
.collect(Collectors.toList());
}

/**
* Rolls back pending clustering instants that target any of the given partitions,
* are eligible for rollback (config enabled, old enough, and a clustering instant),
* and whose heartbeat has expired (indicating the clustering job is no longer alive).
*
* @param client the write client to use for rollback operations
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 rollbackFailedClusteringForPartitions throws on first non-eligible instant instead of skipping

If any pending clustering instant targeting the requested partitions is not yet eligible for rollback (heartbeat still alive or instant too recent), the method throws a HoodieException and skips all remaining instants. A live clustering job holding one instant blocks all writes to those partitions.

Consider logging a warning and continuing rather than throwing, so only genuinely eligible instants are rolled back.

Comment on lines 235 to 258
return compactable;
}

/**
* Determines whether log compaction should be scheduled based on the number of delta commits
* since the last compaction and the last log compaction, compared against the
* {@code hoodie.log.compaction.blocks.threshold} config.
*/
private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) {
Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction();
int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft();
int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent()
? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
: 0;

int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction);
boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold();
if (shouldLogCompact) {
log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince);
}
return shouldLogCompact;
}

private Long parsedToSeconds(String time) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 needLogCompact uses 0 as default when no prior log compaction exists, making Math.min always return 0

When getLatestDeltaCommitInfoSinceLogCompaction() returns Option.empty(), numDeltaCommitsSinceLatestLogCompaction defaults to 0. Math.min(numDeltaCommitsSinceLatestCompaction, 0) is always 0, so shouldLogCompact is always false.

Using Integer.MAX_VALUE as the sentinel is semantically correct — it means "log compaction has never occurred, so count all delta commits as eligible":

Suggested change
return compactable;
}
/**
* Determines whether log compaction should be scheduled based on the number of delta commits
* since the last compaction and the last log compaction, compared against the
* {@code hoodie.log.compaction.blocks.threshold} config.
*/
private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) {
Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction();
int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft();
int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent()
? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
: 0;
int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction);
boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold();
if (shouldLogCompact) {
log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince);
}
return shouldLogCompact;
}
private Long parsedToSeconds(String time) {
int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent()
? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
: Integer.MAX_VALUE;

- Idempotent close() in SparkLanceReaderBase.readBatch iterator: guard
  nullAllocator and partitionVectors against double-free when both the
  TaskContext listener and the outer CloseableIteratorListener invoke close().
- Close iterator in TestLanceColumnarBatch.runReadAndCollect: wrap the consume
  loop in try/finally and close via AutoCloseable; needed on the driver where
  TaskContext.get() is null and the completion-listener path never runs.
- Assert vectorized scan path in 3 integration tests via a new helper that
  inspects SQLMetrics on a columnar FileSourceScan/BatchScan, descending into
  AdaptiveSparkPlanExec and QueryStageExec to reach the scan under AQE.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala`:
- Around line 252-260: The close() implementation currently sets the closed flag
before performing cleanup so exceptions from nullColumnVectors.foreach,
nullAllocator.foreach, batchIterator.close(), or partitionVectors.foreach can
prevent remaining resources from being closed; change the logic in
SparkLanceReaderBase.close() to attempt all close operations first (wrapping
each close call or each group in try/catch to log/ignore exceptions) and only
set closed = true after all cleanup has been attempted (or use a try {
cleanup... } finally { closed = true }) so that one failing close cannot
short-circuit the rest.
- Around line 368-405: The pattern match that fills partition vectors in
SparkLanceReaderBase.scala currently handles TimestampType but not
TimestampNTZType, causing timestamp_ntz partitions to be filled with nulls;
update the match in the method that uses partitionValues, vector and numRows to
include TimestampNTZType (treat it the same as TimestampType) — e.g., add
TimestampNTZType alongside TimestampType or a separate case that reads
partitionValues.getLong(i) and calls vector.putLongs(0, numRows, v) so NTZ
partitions are populated correctly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 37ae27e6-19e5-4508-adc5-19e59f55a5ef

📥 Commits

Reviewing files that changed from the base of the PR and between 8cf5e2f and ff828d0.

📒 Files selected for processing (2)
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceColumnarBatch.scala

Comment on lines +252 to +260
override def close(): Unit = {
// Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close().
if (!closed) {
closed = true
// Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator)
nullColumnVectors.foreach(_.columnVector.close())
nullAllocator.foreach(_.close())
batchIterator.close()
partitionVectors.foreach(_.close())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Attempt every close before making the iterator permanently closed.

Line 255 marks the iterator closed before cleanup runs. If Line 257 or Line 258 throws, batchIterator.close() and partitionVectors.close() are skipped, and every later close() becomes a no-op. This can strand Arrow/Lance resources after an otherwise successful read.

♻️ Proposed fix
       override def close(): Unit = {
         // Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close().
         if (!closed) {
           closed = true
-          // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator)
-          nullColumnVectors.foreach(_.columnVector.close())
-          nullAllocator.foreach(_.close())
-          batchIterator.close()
-          partitionVectors.foreach(_.close())
+          var closeError: Exception = null
+
+          def closeSafely(f: => Unit): Unit = {
+            try {
+              f
+            } catch {
+              case e: Exception =>
+                if (closeError == null) {
+                  closeError = e
+                } else {
+                  closeError.addSuppressed(e)
+                }
+            }
+          }
+
+          // Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator)
+          nullColumnVectors.foreach(v => closeSafely(v.columnVector.close()))
+          nullAllocator.foreach(a => closeSafely(a.close()))
+          closeSafely(batchIterator.close())
+          partitionVectors.foreach(v => closeSafely(v.close()))
+
+          if (closeError != null) {
+            throw closeError
+          }
         }
       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
override def close(): Unit = {
// Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close().
if (!closed) {
closed = true
// Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator)
nullColumnVectors.foreach(_.columnVector.close())
nullAllocator.foreach(_.close())
batchIterator.close()
partitionVectors.foreach(_.close())
override def close(): Unit = {
// Idempotent: TaskContext listener and the outer CloseableIteratorListener may both call close().
if (!closed) {
closed = true
var closeError: Exception = null
def closeSafely(f: => Unit): Unit = {
try {
f
} catch {
case e: Exception =>
if (closeError == null) {
closeError = e
} else {
closeError.addSuppressed(e)
}
}
}
// Close null Arrow vectors and their allocator before batchIterator (which closes the data allocator)
nullColumnVectors.foreach(v => closeSafely(v.columnVector.close()))
nullAllocator.foreach(a => closeSafely(a.close()))
closeSafely(batchIterator.close())
partitionVectors.foreach(v => closeSafely(v.close()))
if (closeError != null) {
throw closeError
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala`
around lines 252 - 260, The close() implementation currently sets the closed
flag before performing cleanup so exceptions from nullColumnVectors.foreach,
nullAllocator.foreach, batchIterator.close(), or partitionVectors.foreach can
prevent remaining resources from being closed; change the logic in
SparkLanceReaderBase.close() to attempt all close operations first (wrapping
each close call or each group in try/catch to log/ignore exceptions) and only
set closed = true after all cleanup has been attempted (or use a try {
cleanup... } finally { closed = true }) so that one failing close cannot
short-circuit the rest.

- Handle TimestampNTZType in SparkLanceReaderBase.populatePartitionVectors:
  TIMESTAMP_NTZ is internally a Long (microseconds since epoch) just like
  TimestampType, but was falling through to the wildcard case and producing
  all-null partition values for any TIMESTAMP_NTZ-partitioned table on the
  vectorized path.
- Replace per-batch O(n) linear scan
  `nullColumnVectors.find(_.colIndex == i)` with a direct-indexed
  `nullColumnByIndex: Array[NullColumnEntry]` lookup, removing m*k array
  scans for tables with m missing columns and k batches.
- Add TestLanceColumnarBatch#testColumnarPathPartitionVectorTimestampNtz
  regression test that fails before the populatePartitionVectors fix because
  every row would carry a null partition value.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant