Skip to content

[OSS PR #18065] feat(schema): Add support to write shredded variants for HoodieRecordType.AVRO#43

Open
yihua wants to merge 2 commits into
masterfrom
oss-18065
Open

[OSS PR #18065] feat(schema): Add support to write shredded variants for HoodieRecordType.AVRO#43
yihua wants to merge 2 commits into
masterfrom
oss-18065

Conversation

@yihua
Copy link
Copy Markdown
Owner

@yihua yihua commented Apr 14, 2026

Mirror of apache#18065 for automated bot review.

Original author: @voonhous
Base branch: master

Summary by CodeRabbit

Release Notes

  • New Features

    • Added clustering expiration with heartbeat-based eligibility tracking for automatic rollback of stale clustering operations.
    • Introduced vector search table-valued functions (hudi_vector_search, hudi_vector_search_batch) for semantic similarity queries in Spark 4.x.
    • Added Parquet variant column shredding support for improved query performance on complex data types (Spark 4.x).
    • Implemented record limit push-down for Flink source reads to optimize LIMIT clause execution.
    • Added column pruning optimization for incremental reads to reduce data transfer.
    • Enabled optional catalog-based partition discovery to improve metadata handling.
  • Bug Fixes

    • Improved schema validation to prevent invalid nested vector configurations.

voonhous added 2 commits April 7, 2026 18:21
- Added support to write shredded types for HoodieRecordType.AVRO
- Added functional tests for testing newly added configs
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 14, 2026

📝 Walkthrough

Walkthrough

This pull request implements clustering instant expiration with heartbeat awareness, introduces Parquet variant column shredding support with schema transformation, refactors distributed metrics registry setup, adds comprehensive vector search capabilities via table-valued functions, consolidates CDC reader implementations in Flink, and enhances schema/timeline handling with incremental query column pruning and log-compaction-aware delta commit selection.

Changes

Cohort / File(s) Summary
Clustering Expiration & Heartbeat
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java, hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
Added clustering instant expiration logic with heartbeat management: new ENABLE_EXPIRATIONS and EXPIRATION_THRESHOLD_MINS config properties, helper method isClusteringInstantEligibleForRollback, heartbeat start/stop during clustering commit finalization, and partition-aware rollback utilities in HoodieClusteringJob.
Variant Shredding Framework
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java, hudi-hadoop-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java, hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java, hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
Introduced variant shredding interface, schema validation (blocking nested VECTOR fields), shredded field/object/variant schema factories, configuration properties, and Avro write support for transforming variants at write time with reflective provider loading.
Parquet Variant Shredding
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java, hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java, hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
Implemented Spark 4.x variant shredding provider converting unshredded Avro variants to shredded Parquet schema with typed_value handling; added schema detection, generation, and writer creation in SparkAdapter; Spark 3.x throws UnsupportedOperationException.
Distributed Registry & Metrics
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java, hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java, hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
Refactored registry creation/retrieval with compound keying, lazy initialization in HoodieSparkEngineContext, setRegistries() for executor propagation, and consolidated DistributedRegistryUtil to replace inline wrapper filesystem metrics setup.
Flink RLI Bootstrap Simplification
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
Removed operator-state checkpoint tracking, awaitPendingInstantsCommitted coordination flow, and index-bootstrap guard; simplified initializeState() to directly initialize metadata without checkpoint synchronization.
Flink CDC Refactoring
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
Consolidated CDC iterator implementations into centralized CdcIterators and CdcImageManager classes; refactored CdcInputFormat and HoodieCdcSplitReaderFunction to delegate to shared implementations, reducing code duplication.
Flink Record Limiting
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
Added RecordLimiter utility for enforcing global row-count limits across Flink source splits; integrated into HoodieSourceSplitReader via HoodieScanContext and HoodieSourceReader constructor changes.
Flink Source Reader Refactoring
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/AbstractSplitReaderFunction.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
Introduced AbstractSplitReaderFunction base class with lazy-initialized HoodieWriteConfig and Hadoop config helpers; refactored existing readers to extend it; updated coordinator to synchronously execute pending instants on RLI-with-bootstrap restart.
Flink Write Operations
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
Added waitUntilDrained() to DisruptorMessageQueue for synchronous queue flushing; updated write operator UID handling to remove WRITE_OPERATOR_UID config dependency; modified bucket assignment to use configuration-driven partition indexing instead of simple modulo parallelism.
Vector Search
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala
Implemented hudi_vector_search and hudi_vector_search_batch table-valued functions with support for cosine/L2/dot-product metrics; brute-force search algorithm with distance UDF factories and batch query cross-join logic.
Incremental Query Improvements
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala
Added column pruning support by switching from TableScan to PrunedScan interface; introduced IncrementalRelationUtil for schema pruning and column filtering; added instant format normalization for incremental queries via formatIncrementalInstant.
Metadata & Partition Listing
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
Added catalog-based partition listing option via FILE_INDEX_PARTITION_LISTING_VIA_CATALOG; new CatalogBackedTableMetadata class for Spark catalog integration; partition predicate expression passing through table metadata API.
Log Compaction & Delta Commits
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java, hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
Added log-compaction-aware delta commit tracking with getLastLogCompaction() and getDeltaCommitsSinceLatestCompletedLogCompaction(); new getAllLogFilesWithMaxCommit() for filtering log files by completion time; updated scheduling logic with needLogCompact() helper.
Timeline Utilities
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java, hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
Added findInstantsModifiedBeforeOrEqualsByCompletionTime() method for before-or-equals completion-time filtering to complement existing after-filtering.
Schema Utilities
hudi-common/src/main/java/org/apache/hudi/util/PartitionPathFilterUtil.java, hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java, hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
Added reusable relativePathPrefixPredicate() for partition filtering; exposed databaseName field and changed getPartitionPathWithPathPrefixUsingFilterExpression visibility in FileSystemBackedTableMetadata; added overload accepting partition predicate expressions.
Queue & Disruptor Utilities
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java, hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
Added waitUntilDrained() to block until queue emptied or error occurs; made ParquetReaderIterator idempotent with closed-state tracking.
Base Client Cleanup
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
Removed no-op initWrapperFSMetrics() method and its invocation; consolidated metrics initialization into DistributedRegistryUtil.createWrapperFileSystemRegistries() call in constructor.
Spark & Flink Configuration
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
Added catalog partition listing and variant shredding read config options; incremental instant time normalization; removed Flink WRITE_OPERATOR_UID config; propagated variant read settings to Spark SQLConf.
Test Coverage
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java, hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala, hudi-flink-datasource/hudi-flink/src/test/...(multiple), hudi-hadoop-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupportVariantShredding.java
Comprehensive test additions for clustering expiration, variant shredding, vector search functionality, distributed registry, record limiting, CDC reader refactoring, schema validation, and incremental query column pruning across multiple modules.
Maven Dependencies
pom.xml, hudi-spark-datasource/hudi-spark-common/pom.xml, hudi-spark-datasource/hudi-spark/pom.xml
Added spark-mllib-local dependency management and module-level dependencies for vector distance computation support.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes


🐰 From the Burrow

Hopping through clusters with heartbeats in sync,
Variants now shredded in a Parquet ink,
Vectors search swiftly through data so vast,
CDC refactored and registry recast,
Flink bounds its records with limits in place,
A grand optimization of distributed space! 🚀

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch oss-18065

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Note

Due to the large number of review comments, Critical severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java (1)

189-191: ⚠️ Potential issue | 🟠 Major

Propagate registries in foreach too.

All the other executor-side helpers install the cached registries before invoking user code, but foreach still calls consumer.accept directly. Any metrics updated from a foreach callback will miss the distributed registries.

Suggested fix
 `@Override`
 public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism) {
-  javaSparkContext.parallelize(data, parallelism).foreach(consumer::accept);
+  final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
+  javaSparkContext.parallelize(data, parallelism).foreach(item -> {
+    setRegistries(registries);
+    consumer.accept(item);
+  });
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`
around lines 189 - 191, In HoodieSparkEngineContext.foreach, wrap the provided
SerializableConsumer<I> so the executor installs the cached registries before
invoking user code: replace the direct javaSparkContext.parallelize(data,
parallelism).foreach(consumer::accept) call with a wrapped consumer that calls
installCachedRegistriesOnExecutor() (the same helper used by other executor-side
helpers) and then invokes consumer.accept(...), ensuring registries are
installed on each executor before user callbacks run.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java (1)

603-637: ⚠️ Potential issue | 🟠 Major

Always stop the clustering heartbeat in finally.

With the new expiration flow, a failure anywhere before Line 635 leaves the heartbeat alive. That makes the failed clustering instant look active and can prevent the expiration-based rollback path from ever picking it up. Compaction already does this cleanup in finally; clustering should mirror that.

Proposed fix
     } catch (Exception e) {
       throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
     } finally {
       this.txnManager.endStateChange(Option.of(clusteringInstant));
       releaseResources(clusteringCommitTime);
+      if (config.isExpirationOfClusteringEnabled()) {
+        heartbeatClient.stop(clusteringCommitTime);
+      }
     }
     WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
         .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
     if (clusteringTimer != null) {
       long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
@@
-    if (config.isExpirationOfClusteringEnabled()) {
-      heartbeatClient.stop(clusteringCommitTime);
-    }
     log.info("Clustering successfully on commit {} for table {}", clusteringCommitTime, table.getConfig().getBasePath());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 603 - 637, The clustering heartbeat
(heartbeatClient.stop(clusteringCommitTime)) must be moved into the finally
block so it always runs even if exceptions occur: update
BaseHoodieTableServiceClient around the try/catch/finally that currently ends
the txn and releases resources to also call
heartbeatClient.stop(clusteringCommitTime) inside the finally (guarded by
config.isExpirationOfClusteringEnabled()), removing the post-try unconditional
call; ensure you still stop the timer/metrics and delete markers after the
finally as appropriate, so heartbeatClient.stop is guaranteed to run for every
clustering instant reference (clusteringCommitTime) and mirrors compaction's
cleanup behavior.
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java (1)

484-515: ⚠️ Potential issue | 🟠 Major

Recurse through container types when enforcing the nested VECTOR ban.

validateNoVectorInNestedRecord() only walks into nested RECORD fields. A schema like record -> array<record{embedding: VECTOR}> or record -> map<record{embedding: VECTOR}> still passes because ARRAY/MAP/UNION branches are never visited, even though the new exception text says those nested placements are unsupported. This leaves invalid schemas accepted here and failing later in writer/reader code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java`
around lines 484 - 515, validateNoVectorInNestedRecord currently only recurses
into RECORD types so VECTORs nested inside ARRAY/MAP/UNION branches are missed;
update this method to, after obtaining nonNull via
field.schema().getNonNullType(), check nonNull.getType() and (a) if RECORD call
validateNoVectorInNestedRecord(nonNull.getFields(), true), (b) if ARRAY recurse
into its element type (e.g., validateNoVectorInNestedRecord over a synthetic
HoodieSchemaField built from nonNull.getElementType() or call a helper that
accepts HoodieSchema), (c) if MAP recurse into both key/value types (at least
value) via nonNull.getValueType(), and (d) if UNION iterate nonNull.getTypes()
and recurse into each branch; keep the existing VECTOR check when nested is true
and use the same exception. Ensure you reuse HoodieSchema/ HoodieSchemaField
accessors (schema().getNonNullType(), getElementType(), getValueType(),
getTypes(), getFields()) or add a small helper that walks a HoodieSchema to
avoid duplicating logic.
🟠 Major comments (23)
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java-58-63 (1)

58-63: ⚠️ Potential issue | 🟠 Major

EOF auto-close currently leads to duplicate underlying close via next() error path.

After EOF, hasNext() closes the reader here, but a later next() still hits a broad catch and calls FileIOUtils.closeQuietly(parquetReader) again. That bypasses the new closed gate and reintroduces double-close behavior on the wrapped reader.

Proposed fix
@@
   public T next() {
     try {
@@
-    } catch (Exception e) {
-      FileIOUtils.closeQuietly(parquetReader);
-      throw new HoodieException("unable to read next record from parquet file ", e);
+    } catch (HoodieException e) {
+      // Preserve iterator contract errors like end-of-data
+      throw e;
+    } catch (Exception e) {
+      close();
+      throw new HoodieException("unable to read next record from parquet file ", e);
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java`
around lines 58 - 63, The EOF auto-close in hasNext() can race with next()'s
error path and cause a double-close; fix by ensuring the closed gate is set
before any actual close so subsequent catches don't attempt to close again:
update the close() implementation (and/or set closed = true at the start of
hasNext() before calling close()) so closed is true prior to invoking
FileIOUtils.closeQuietly(parquetReader), and ensure next() checks the closed
flag (or relies on close()'s early-set) before calling FileIOUtils.closeQuietly;
use the existing symbols hasNext(), next(), close(), closed, parquetReader and
FileIOUtils.closeQuietly to locate and make the change.
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java-127-137 (1)

127-137: ⚠️ Potential issue | 🟠 Major

Bound this drain wait to avoid hanging shutdown/checkpoints.

Line 127 introduces an unbounded poll loop. If the consumer thread stalls without throwing, waitUntilDrained() can block forever; endInput() is especially exposed because it may never get an external interrupt. Please add a deadline/timeout and mark the queue as failed when it expires.

Suggested fix
   public void waitUntilDrained() {
+    final long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(TIMEOUT_WAITING_SECS);
     while (!isEmpty()) {
       if (throwable.get() != null) {
         return;
       }
       if (Thread.currentThread().isInterrupted()) {
         markAsFailed(new HoodieException("Interrupted while waiting for disruptor queue to drain"));
         return;
       }
+      if (System.nanoTime() >= deadlineNanos) {
+        markAsFailed(new HoodieException("Timed out while waiting for disruptor queue to drain"));
+        return;
+      }
       LockSupport.parkNanos(100_000);
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java`
around lines 127 - 137, The waitUntilDrained method currently loops
indefinitely; add a bounded deadline using System.nanoTime() (e.g., compute
deadline = now + TIMEOUT_NANOS) and check inside the loop whether
System.nanoTime() > deadline; if expired call markAsFailed(new
HoodieException("Timed out waiting for disruptor queue to drain")) (or similar
message), set throwable accordingly and return; keep existing checks for
throwable.get() and Thread.currentThread().isInterrupted(); introduce a clear
constant like DRAIN_TIMEOUT_NANOS and apply it in waitUntilDrained to avoid
hanging during endInput() or shutdown.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala-28-74 (1)

28-74: ⚠️ Potential issue | 🟠 Major

Keep filter-only columns available until incremental filters are applied.

These helpers only preserve requiredColumns, but IncrementalRelationV1/IncrementalRelationV2 still apply hoodie.datasource.read.incr.filters afterward. A query that projects _row_key but filters on rider/partition will now prune that column first and then fail with an unresolved-column AnalysisException. Please retain columns referenced by incremental filters, or delay filterRequiredColumnsFromDF(...) until after those filters run.

Also applies to: 77-109

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala`
around lines 28 - 74, getPrunedSchema currently only adds requiredColumns (plus
commit_time, partitions, ordering) which can prune columns used by incremental
filters (hoodie.datasource.read.incr.filters) and cause unresolved-column errors
in IncrementalRelationV1/IncrementalRelationV2; update getPrunedSchema to also
include any column names referenced by the incremental filter expression(s)
(parse or read the config key hoodie.datasource.read.incr.filters from
metaClient/table config or accept the filter set passed in) so those filter-only
columns are retained, or alternatively move the callsite that invokes
filterRequiredColumnsFromDF(...) in IncrementalRelationV1/IncrementalRelationV2
to after incremental filters are applied; ensure references to
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getPrunedSchema,
filterRequiredColumnsFromDF, and IncrementalRelationV1/IncrementalRelationV2 are
preserved so the change is easy to locate.
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala-126-140 (1)

126-140: ⚠️ Potential issue | 🟠 Major

Assertion at lines 165-168 cannot reliably verify scan-level column pruning—use executedPlan pattern matching instead.

The current plan.contains() check is too weak; it passes with plain projection (reading all columns then filtering in Spark) without proving columns are actually pruned at the file scan level. Instead, pattern-match on executedPlan to extract FileSourceScanExec/RowDataSourceScanExec and validate their .requiredSchema property directly, matching the approach used in TestNestedSchemaPruningOptimization.scala.

Also applies to: 164-168

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala`
around lines 126 - 140, The test currently asserts pruning by comparing
prunedDF.schema field counts, which can be satisfied by a post-scan projection
and does not prove scan-level pruning; change the verification to inspect
prunedDF.queryExecution.executedPlan, pattern-match for FileSourceScanExec or
RowDataSourceScanExec nodes (as done in
TestNestedSchemaPruningOptimization.scala), and assert the scan node's
requiredSchema (or requiredAttributes) contains only the selected columns
("col1","col3") and excludes others; update references to prunedDF and the
executedPlan pattern matching to validate FileSource/RowDataSource scan
requiredSchema rather than relying on prunedDF.schema length.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java-39-69 (1)

39-69: ⚠️ Potential issue | 🟠 Major

Honor NO_LIMIT consistently inside wrap().

isLimitReached() treats -1 as unlimited, but nextRecordFromSplit() checks totalReadCount >= limit directly. new RecordLimiter(NO_LIMIT) therefore stops on the very first read (0 >= -1), and any other negative value is silently accepted as "already reached".

Proposed fix
 public class RecordLimiter {
   public static final long NO_LIMIT = -1L;
@@
   public RecordLimiter(long limit) {
+    if (limit < 0 && limit != NO_LIMIT) {
+      throw new IllegalArgumentException("limit must be >= 0 or NO_LIMIT");
+    }
     this.limit = limit;
   }
@@
   public <T> RecordsWithSplitIds<T> wrap(RecordsWithSplitIds<T> records) {
+    if (limit == NO_LIMIT) {
+      return records;
+    }
     return new RecordsWithSplitIds<T>() {
@@
       public T nextRecordFromSplit() {
-        if (totalReadCount.get() >= limit) {
+        if (isLimitReached()) {
           return null;
         }
         T record = records.nextRecordFromSplit();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java`
around lines 39 - 69, The wrap() implementation incorrectly compares
totalReadCount.get() >= limit directly and thus treats NO_LIMIT (e.g., -1) as
already reached; update nextRecordFromSplit() to honor NO_LIMIT by either
calling isLimitReached() or using the same check (limit != NO_LIMIT &&
totalReadCount.get() >= limit) before returning null, and keep incrementing
totalReadCount only when a non-null record is returned (use the existing
totalReadCount.incrementAndGet() logic on record != null). Ensure the change
references RecordLimiter.wrap, nextRecordFromSplit, totalReadCount, and NO_LIMIT
so behavior matches isLimitReached().
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java-169-175 (1)

169-175: ⚠️ Potential issue | 🟠 Major

Guard skipBytesToRead() against infinite loops on corrupted data.

DataInputStream.skipBytes() can return 0 before the requested byte count is consumed. On truncated or corrupted payloads, this loop never makes progress and hangs deserialization instead of failing fast.

Proposed fix
+import java.io.EOFException;
@@
     `@Override`
     public void skipBytesToRead(int numBytes) throws IOException {
       while (numBytes > 0) {
         int skipped = skipBytes(numBytes);
+        if (skipped <= 0) {
+          throw new EOFException("Unable to skip remaining " + numBytes + " bytes");
+        }
         numBytes -= skipped;
       }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java`
around lines 169 - 175, skipBytesToRead currently loops until numBytes is 0 but
relies on DataInputStream.skipBytes(numBytes) which can return 0 on
truncated/corrupted streams and cause an infinite loop; update
CdcImageManager.skipBytesToRead to detect when skipBytes returns 0 (or no
progress) and throw an IOException/EOFException immediately (or after a small
retry) with a clear message so deserialization fails fast; keep using
skipBytes(numBytes) and decrement numBytes by skipped when >0, but if skipped ==
0, throw new IOException("Failed to skip required bytes in skipBytesToRead:
stream ended or corrupted") to avoid hanging.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java-310-323 (1)

310-323: ⚠️ Potential issue | 🟠 Major

Unchecked Option.get() may throw if merge returns empty.

mergeRowWithLog() returns Option.ofNullable(...) which could be empty if constructHoodieRecord returns null. Calling .get() directly on line 313 would throw NoSuchElementException.

🛡️ Proposed fix to handle empty merge result
           } else {
             HoodieOperation operation = HoodieOperation.fromValue(existed.getRowKind().toByteValue());
             HoodieRecord<RowData> historyRecord = new HoodieFlinkRecord(record.getKey(), operation, existed);
-            HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord, record).get();
-            if (merged.getData() != existed) {
+            Option<HoodieRecord<RowData>> mergedOpt = mergeRowWithLog(historyRecord, record);
+            if (mergedOpt.isPresent() && mergedOpt.get().getData() != existed) {
+              HoodieRecord<RowData> merged = mergedOpt.get();
               existed.setRowKind(RowKind.UPDATE_BEFORE);
               currentImage = existed;
               RowData mergedRow = merged.getData();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 310 - 323, The call to mergeRowWithLog(...) may return an empty
Option and calling .get() on it can throw; update the block in CdcIterators
where mergeRowWithLog(historyRecord, record).get() is used to first check the
Option (e.g., Option.isPresent()/isEmpty()) or use Option.orElse/ifPresent
pattern to handle the empty case safely, and only proceed with
existed.setRowKind..., imageManager.updateImageRecord(...), and return true when
a merged result exists; if the Option is empty, treat it as a no-merge situation
(skip the update path and continue) so that
constructHoodieRecord()/mergeRowWithLog() returning null does not cause a
NoSuchElementException.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java-541-547 (1)

541-547: ⚠️ Potential issue | 🟠 Major

Potential NPE if image record is not found.

imageManager.getImageRecord() may return null if the record key is not found in the image map. Calling row.setRowKind(rowKind) on a null reference will throw NullPointerException. The same issue exists in RecordKeyImageIterator.getBeforeImage() at line 590.

🛡️ Proposed fix to validate record existence
   `@Override`
   protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
     String recordKey = cdcRecord.get(1).toString();
     RowData row = imageManager.getImageRecord(recordKey, afterImages, rowKind);
+    ValidationUtils.checkState(row != null,
+        "After-image record not found for key: " + recordKey);
     row.setRowKind(rowKind);
     return projection.project(row);
   }

Apply similar fix in RecordKeyImageIterator.getBeforeImage().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 541 - 547, getAfterImage currently calls
imageManager.getImageRecord(recordKey, afterImages, rowKind) and immediately
uses the result, which can be null; update getAfterImage (and mirror the change
in RecordKeyImageIterator.getBeforeImage) to null-check the RowData returned
from imageManager.getImageRecord, and if null either return null (or an
appropriate empty/absent RowData) before calling row.setRowKind(rowKind) and
projection.project(row), or throw a clear exception; ensure you reference the
same variables and methods (recordKey, afterImages, imageManager.getImageRecord,
RowData, setRowKind, projection.project) so the protection is applied in both
methods.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala-64-77 (1)

64-77: ⚠️ Potential issue | 🟠 Major

Handle null elements before unboxing vector arrays.

These UDFs assume every array element is non-null. If a corpus/query row contains an array like [1.0, null, 2.0], Spark will fail during deserialization/unboxing before you can surface a meaningful HoodieAnalysisException. Please either reject nullable-element embeddings up front or switch these UDFs to boxed element types and validate nulls explicitly.

Also applies to: 99-126

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala`
around lines 64 - 77, The UDFs in VectorDistanceUtils currently unbox primitive
Seq elements (Float/Double/Byte) which will throw on nulls; update the UDFs (the
FloatType/DoubleType/ByteType cases and the similar blocks at 99-126) to accept
boxed types (Seq[java.lang.Float], Seq[java.lang.Double], Seq[java.lang.Byte])
and explicitly validate for null elements in both corpus and query vectors
before unboxing; on any null element throw a HoodieAnalysisException with a
clear message (or alternatively reject nullable-element embeddings up front),
then map the validated boxed values to primitive doubles and call
requireSameLength and distFn(new DenseVector(...), queryDv, queryNorm) as
before.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala-108-112 (1)

108-112: ⚠️ Potential issue | 🟠 Major

Fail fast on internal column-name collisions.

The planner reserves _hudi_distance, _hudi_query_index, _hudi_query_emb, and _hudi_rank, but it never checks whether the corpus/query schemas already contain them. That can silently overwrite or drop user columns—for example, a corpus column named _hudi_query_emb is removed by .drop(QUERY_EMB_ALIAS). Please validate these reserved names up front or generate guaranteed-unique internal aliases.

Also applies to: 272-276, 301-324

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala`
around lines 108 - 112, The builder reserves internal column names
(DISTANCE_COL, QUERY_ID_COL, QUERY_EMB_ALIAS, RANK_COL, QUERY_COL_PREFIX in
HoodieVectorSearchPlanBuilder) but never checks for collisions with the
corpus/query schemas so user columns can be overwritten; update the builder to
either (a) validate up front by scanning the input corpus and query schemas for
these reserved names and fail fast with a clear exception referencing the
offending name(s), or (b) generate guaranteed-unique internal aliases (e.g.,
append a stable unique suffix/UUID to QUERY_EMB_ALIAS, DISTANCE_COL, etc.) and
use those generated symbols everywhere the code currently references the
constants (including the places that call .drop(QUERY_EMB_ALIAS) and the code
paths currently around the sections flagged for change), ensuring all references
are wired to the validated/generated names so no user column is lost.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala-90-102 (1)

90-102: ⚠️ Potential issue | 🟠 Major

Add foldable check before calling expr.eval() in parseK.

At line 91, expr.eval() is invoked without first checking if the expression is foldable. Unevaluable or unresolved expressions will throw Spark internal exceptions (QueryExecutionError) instead of the expected HoodieAnalysisException, causing poor user-facing error messages.

Suggested fix
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal}
 import org.apache.spark.sql.hudi.command.exception.HoodieAnalysisException
 import org.apache.spark.sql.types.StringType
+import scala.util.control.NonFatal

@@
   private[logical] def parseK(funcName: String, expr: Expression): Int = {
+    if (!expr.foldable) {
+      throw new HoodieAnalysisException(
+        s"Function '$funcName': argument 'k' must be a positive integer literal, got: ${expr.sql}")
+    }
-    val rawValue = expr.eval()
+    val rawValue = try {
+      expr.eval()
+    } catch {
+      case NonFatal(_) =>
+        throw new HoodieAnalysisException(
+          s"Function '$funcName': argument 'k' must be a positive integer literal, got: ${expr.sql}")
+    }
     if (rawValue == null) {
       throw new HoodieAnalysisException(
         s"Function '$funcName': k must be a positive integer, got null")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala`
around lines 90 - 102, In parseK, avoid calling expr.eval() on non-foldable
expressions; first check expr.foldable and if false throw a
HoodieAnalysisException indicating the function (funcName) requires a constant
positive integer for k, then proceed to call expr.eval() only when foldable;
keep the existing null and NumberFormatException checks and messages (function
parseK, variable expr and funcName) so unevaluable/unresolved expressions
produce a HoodieAnalysisException instead of Spark internal errors.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java-167-173 (1)

167-173: ⚠️ Potential issue | 🟠 Major

Count only completed delta commits for log-compaction thresholds.

This path counts the raw delta timeline, while getLatestDeltaCommitInfoSinceCompaction() counts completed instants only. That makes needLogCompact() fire early when an inflight delta commit exists after the last completed log compaction.

💡 Proposed fix
     if (deltaCommitsInfo.isPresent()) {
       return Option.of(Pair.of(
-          deltaCommitsInfo.get().getLeft().countInstants(),
+          deltaCommitsInfo.get().getLeft().filterCompletedInstants().countInstants(),
           deltaCommitsInfo.get().getRight().requestedTime()));
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java`
around lines 167 - 173, The code currently counts all instants on the delta
timeline (deltaCommitsInfo.get().getLeft().countInstants()), which can include
inflight instants and makes needLogCompact() trigger prematurely; change the
count to consider only completed delta commits like
getLatestDeltaCommitInfoSinceCompaction() does. Update the use of
deltaCommitsInfo.get().getLeft() to filter or query only completed instants
(e.g., call the timeline method that returns only completed instants or filter
by HoodieInstant::isCompleted) and then call countInstants() on that
completed-only timeline before returning; keep the rest of the pairing with
deltaCommitsInfo.get().getRight() the same.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala-68-75 (1)

68-75: ⚠️ Potential issue | 🟠 Major

Mutating shared Spark SQLConf during relation instantiation can cause last-writer-wins config conflicts.

This initializer block runs each time a relation is instantiated, modifying the session-wide sessionState.conf. When multiple relations with different option values are created in the same query, later instances will overwrite earlier config settings for all reads using that config key.

The approach is necessary because Spark's ParquetToSparkSchemaConverter reads spark.sql.variant.allowReadingShredded from SQLConf during schema resolution, but consider whether the config mutation scope can be narrowed (e.g., per-relation context, thread-local storage, or schema-resolution time injection) to avoid cross-relation contamination.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala`
around lines 68 - 75, The code in HoodieHadoopFsRelationFactory mutates the
session-wide sqlConf (sqlContext.sparkSession.sessionState.conf) on every
relation instantiation, causing last-writer-wins conflicts; instead of calling
sqlConf.setConfString("spark.sql.variant.allowReadingShredded", ...), avoid
mutating sessionState.conf and apply the setting only in a scoped context used
for schema resolution (e.g., when invoking ParquetToSparkSchemaConverter).
Concretely: remove the unconditional sqlConf.setConfString call in the
HoodieHadoopFsRelationFactory init, create a local/temporary SQLConf (or clone
the existing SQLConf) and set the hoodie.parquet.variant.allow.reading.shredded
value on that local conf, then use a scoped execution/path that accepts or
references that local conf during ParquetToSparkSchemaConverter schema
resolution (or use a ThreadLocal or an explicit parameter to the converter) so
global sessionState.conf is never modified by HoodieHadoopFsRelationFactory or
HoodieSparkUtils.gteqSpark4_0 checks.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java-39-54 (1)

39-54: ⚠️ Potential issue | 🟠 Major

Reset the wrapper FS registries when metrics are disabled.

HoodieWrapperFileSystem keeps these registries in static state, but this helper only touches them on the isMetricsOn() path. A later client in the same JVM with metrics disabled will inherit the previous table's registries and keep publishing metrics unexpectedly unless you add an explicit reset/clear path here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java`
around lines 39 - 54, The helper createWrapperFileSystemRegistries currently
only sets HoodieWrapperFileSystem registries when config.isMetricsOn() is true,
leaving static registries intact for later clients; add an explicit reset path
for the disabled case: when config.isMetricsOn() is false call
HoodieWrapperFileSystem.setMetricsRegistry(null, null) (or invoke a clear/reset
method on HoodieWrapperFileSystem if available) so the static registries are
cleared and no prior table's metrics are published by later clients.
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala-402-424 (1)

402-424: ⚠️ Potential issue | 🟠 Major

Don't classify every binary {value, metadata} struct as a variant.

looksLikeVariantStruct now matches any struct that happens to contain binary value and metadata fields, even if it's an ordinary user-defined struct with extra fields. Those rows will take the variant conversion branch and can be rewritten incorrectly during schema evolution. Please gate this on the exact shredded-variant shape or an explicit schema marker rather than the loose structural check.

Also applies to: 430-441

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala`
around lines 402 - 424, The current loose structural check in
looksLikeVariantStruct causes any struct with binary "value" and "metadata"
fields to be treated as a variant and misrouted by the case branches around the
variant conversion (the case handling newStructType/prevStructType that call
sparkAdapter.convertVariantToStruct and convertStructToVariant). Tighten the
predicate: change looksLikeVariantStruct (and any uses in those case branches)
to require the exact shredded-variant shape (exact field names/count and
expected types) or an explicit schema marker field/flag (e.g., a dedicated
"__shredded_variant" marker) and reject structs with extra fields; then update
the two case branches that call
sparkAdapter.convertVariantToStruct/convertStructToVariant to use the stricter
check so only true shredded-variant rows hit the conversion code (apply same fix
to the other similar branch).
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java-92-97 (1)

92-97: ⚠️ Potential issue | 🟠 Major

Scope the distributed-registry cache per Spark application.

DISTRIBUTED_REGISTRY_MAP is JVM-global and keyed only by tableName.registryName. If the same table is used from a second JavaSparkContext in the same process, computeIfAbsent reuses the old entry and skips register(javaSparkContext), so the registry stays bound to the first app. It also leaves stale registries around indefinitely.

Also applies to: 281-288

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`
around lines 92 - 97, DISTRIBUTED_REGISTRY_MAP is currently a JVM-global keyed
only by tableName.registryName leading to cross-application reuse; change the
cache to be scoped per Spark application (e.g., make the top-level map keyed by
Spark application id and then map registry keys to Registry instances) and use
the JavaSparkContext's application id (javaSparkContext.sc().applicationId() or
similar) as part of the key/first-level map so computeIfAbsent will
create/register a fresh Registry for each Spark app and call
register(javaSparkContext) for that app; also ensure you remove/cleanup entries
when the SparkContext stops (or use weak/expiring entries) so stale registries
are not retained.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java-269-285 (1)

269-285: ⚠️ Potential issue | 🟠 Major

Reject non-positive clustering expiration thresholds.

Once expirations are enabled, hoodie.clustering.expiration.threshold.mins <= 0 makes fresh clustering instants immediately rollback-eligible. Please validate this as a positive value in Builder.validate().

Suggested validation
     private void validate() {
+      long expirationThresholdMins = clusteringConfig.getLong(EXPIRATION_THRESHOLD_MINS);
+      ValidationUtils.checkArgument(expirationThresholdMins > 0,
+          EXPIRATION_THRESHOLD_MINS.key() + " must be > 0");
+
       boolean inlineCluster = clusteringConfig.getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
       boolean inlineClusterSchedule = clusteringConfig.getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING);
       ValidationUtils.checkArgument(!(inlineCluster && inlineClusterSchedule), String.format("Either of inline clustering (%s) or "
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java`
around lines 269 - 285, In Builder.validate(), add a check that when
ENABLE_EXPIRATIONS is true (use the ConfigProperty ENABLE_EXPIRATIONS) then
EXPIRATION_THRESHOLD_MINS must be > 0 (use the ConfigProperty
EXPIRATION_THRESHOLD_MINS); if the threshold is <= 0, throw an
IllegalArgumentException with a clear message indicating the threshold must be
positive to avoid immediate expiration of new clustering instants. Ensure you
reference these config properties by name in the validation logic so the check
runs during builder validation.
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java-336-350 (1)

336-350: ⚠️ Potential issue | 🟠 Major

Skip healthy clustering instants instead of throwing here.

This helper is documented as rolling back failed/expired clustering instants, but a live or just-started instant currently aborts the caller with HoodieException. That makes a cleanup path fail on healthy clustering activity for the same partitions.

Suggested change
     for (HoodieInstant instant : getPendingClusteringInstantsForPartitions(metaClient, partitions)) {
       if (!BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(
           metaClient, instant, client.getConfig(), client.getHeartbeatClient())) {
-        throw new HoodieException("Clustering instant " + instant.requestedTime()
-            + " targeting requested partitions is not eligible for rollback "
-            + "(heartbeat still active or instant too recent)");
+        LOG.info("Skipping clustering instant {} because it is not rollback-eligible",
+            instant.requestedTime());
+        continue;
       }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java`
around lines 336 - 350, The code currently throws a HoodieException when
BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(...) returns
false, aborting cleanup for healthy/active clustering instants; change this to
skip those instants instead by logging a message (e.g., LOG.info or LOG.debug)
that the instant is not eligible for rollback and continue the loop so only
expired/failed instants reach client.rollback(...). Update the block around
getPendingClusteringInstantsForPartitions(...), replacing the throw with a
non-throwing log and continue, keeping the existing
metaClient.reloadActiveTimeline() and the subsequent containsInstant check/
client.rollback(...) logic intact.
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala-446-511 (1)

446-511: ⚠️ Potential issue | 🟠 Major

Provide default implementations for new SparkAdapter methods to maintain backward compatibility.

Adding abstract methods to a public trait breaks compatibility for any out-of-tree SparkAdapter implementations compiled against older Hudi versions. Instead, provide default implementations here (false / UnsupportedOperationException for Spark 3 behavior) and let BaseSpark4Adapter override them with actual functionality. This allows existing implementations to continue working without recompilation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala`
around lines 446 - 511, The new abstract methods in the SparkAdapter trait
(isVariantShreddingStruct, generateVariantWriteShreddingSchema,
createShreddedVariantWriter, convertVariantToStruct, convertStructToVariant)
must get default implementations to preserve backward compatibility: implement
isVariantShreddingStruct to return false, and implement the other four to throw
UnsupportedOperationException (with a clear message indicating Spark 3
behavior), so existing out-of-tree SparkAdapter implementations won't break;
then ensure BaseSpark4Adapter overrides these methods with the Spark 4
implementations.
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala-218-220 (1)

218-220: ⚠️ Potential issue | 🟠 Major

Wrap SQL config changes in withSQLConf() to restore settings after each test.

These spark.sql("set ...") calls mutate the shared Spark session without restoration, causing subsequent tests to inherit settings and become order-dependent. The parent class provides withSQLConf() for exactly this purpose—it saves current values, applies new ones, and restores them in a finally block. Replace all 13 occurrences throughout this file with:

withSQLConf(
  "hoodie.parquet.variant.write.shredding.enabled" -> "true",
  "hoodie.parquet.variant.allow.reading.shredded" -> "true",
  "hoodie.parquet.variant.force.shredding.schema.for.test" -> "a int, b string"
) {
  // test logic here
}

This ensures configs are scoped to each test and don't leak to others.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala`
around lines 218 - 220, Replace the direct spark.sql("set ...") calls (e.g.,
spark.sql("set hoodie.parquet.variant.write.shredding.enabled = true"),
spark.sql("set hoodie.parquet.variant.allow.reading.shredded = true"),
spark.sql("set hoodie.parquet.variant.force.shredding.schema.for.test = a int, b
string")) with a scoped withSQLConf block that restores settings after the test;
use withSQLConf("hoodie.parquet.variant.write.shredding.enabled" -> "true",
"hoodie.parquet.variant.allow.reading.shredded" -> "true",
"hoodie.parquet.variant.force.shredding.schema.for.test" -> "a int, b string") {
/* move the test logic that currently follows the spark.sql calls into this
block */ } to ensure configs do not leak across tests (replace all 13
occurrences).
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java-133-157 (1)

133-157: ⚠️ Potential issue | 🟠 Major

Nested variant fields are still handled only at the top level.

All of the new paths walk schema.getFields() / effectiveSchema.getFields() on the root record and write() only rewrites fields found by top-level index. A variant nested inside a record/array/map never gets shredded or unshredded here, so enabling write shredding on nested columns will still emit the original shape, while disabling shredding leaves nested typed_value columns behind.

Also applies to: 195-216, 228-269, 324-352, 460-491

hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java-96-110 (1)

96-110: ⚠️ Potential issue | 🟠 Major

setRegistries drops the table component from the new compound key.

Lookups now happen through makeKey(tableName, registryName), but propagated registries are inserted as makeKey("", registry.getName()). That means table-scoped registries can collide across tables, and executor-side lookups with a non-empty tableName will miss the propagated instance and create a second registry instead.

Also applies to: 152-155

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java` around
lines 96 - 110, Propagated registries are being stored under makeKey("",
registry.getName()) which drops the table component causing cross-table
collisions and missed lookups by getRegistryOfClass; update the code in
setRegistries (the places that currently call makeKey("", registry.getName()))
to preserve the original table scope when inserting into REGISTRY_MAP — e.g.,
build the key from the registry's actual table scope (or reuse the registry's
original compound key) instead of using an empty tableName; ensure the same fix
is applied to both propagation sites referenced (the second occurrence around
the setRegistries logic) so lookups via getRegistryOfClass(tableName,
registryName, clazz) find the propagated instance.
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java-2607-2663 (1)

2607-2663: ⚠️ Potential issue | 🟠 Major

getPlainTypedValueSchema() only unwraps one level of shredded object fields.

For a nested object field, this strips the outer {value, typed_value} wrapper but leaves any deeper wrappers intact. So a shape like outer -> { value, typed_value: record{ inner -> { value, typed_value: int } } } comes back as outer: record{ inner -> { value, typed_value: int } }, not the fully plain schema the method advertises. That will break recursive nested-object shredding.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java`
around lines 2607 - 2663, getPlainTypedValueSchema() only unwraps one level of
the {value, typed_value} wrapper causing nested shredded records to remain
wrapped; update it to recursively unwrap typed_value fields instead of taking
innerTypedValue as-is. Implement or call a helper (e.g.,
unwrapTypedValueSchema(HoodieSchema) or make getPlainTypedValueSchemaRecursive)
that, given a HoodieSchema, returns the plain (non-wrapped) schema by checking
nullability, descending into VARIANT_TYPED_VALUE_FIELD, and repeating until the
inner type is not the {value, typed_value} record; replace the current
extraction where innerTypedValue is used (the loop over fields in
getPlainTypedValueSchema and the call to HoodieSchemaField.of /
HoodieSchema.createRecord) to use the recursive unwrapped schema so nested
objects are fully flattened.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: fa9818f0-beb4-4b78-8c26-77b0572b16c9

📥 Commits

Reviewing files that changed from the base of the PR and between 35e2bbf and 4d1e2b8.

📒 Files selected for processing (121)
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
  • hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
  • hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
  • hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
  • hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
  • hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
  • hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
  • hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
  • hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
  • hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
  • hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
  • hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
  • hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
  • hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java
  • hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
  • hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
  • hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
  • hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
  • hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
  • hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
  • hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
  • hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
  • hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
  • hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
  • hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
  • hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
  • hudi-common/src/main/java/org/apache/hudi/util/PartitionPathFilterUtil.java
  • hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
  • hudi-common/src/test/java/org/apache/hudi/util/TestPartitionPathFilterUtil.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/AbstractSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBufferSort.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateSnapshotContext.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestRecordLimiter.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestAbstractSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
  • hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
  • hudi-hadoop-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
  • hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
  • hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
  • hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
  • hudi-hadoop-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupportVariantShredding.java
  • hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
  • hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
  • hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
  • hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieAvroFileWriterFactoryVariantShredding.java
  • hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
  • hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
  • hudi-spark-datasource/hudi-spark-common/pom.xml
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
  • hudi-spark-datasource/hudi-spark/pom.xml
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCatalogBackedTableMetadata.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestInstantTimeValidation.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
  • hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
  • hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
  • hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
  • hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/variant/TestSpark4VariantShreddingProvider.java
  • hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
  • hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
  • hudi-spark-datasource/hudi-spark4.0.x/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupportVariant.java
  • hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
  • hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
  • pom.xml
💤 Files with no reviewable changes (6)
  • hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java

Comment on lines +111 to +126
@Override
public boolean hasNext() {
if (recordIterator != null) {
if (recordIterator.hasNext()) {
return true;
} else {
recordIterator.close();
recordIterator = null;
}
}
if (fileSplitIterator.hasNext()) {
recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
return recordIterator.hasNext();
}
return false;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: Empty iterator from factory causes premature termination.

If recordIteratorFunc.apply() returns an empty iterator (i.e., hasNext() returns false immediately), this method returns false without trying remaining file splits. This could skip valid records in subsequent splits.

🐛 Proposed fix to loop through empty iterators
 `@Override`
 public boolean hasNext() {
-  if (recordIterator != null) {
-    if (recordIterator.hasNext()) {
-      return true;
-    } else {
+  while (true) {
+    if (recordIterator != null) {
+      if (recordIterator.hasNext()) {
+        return true;
+      }
       recordIterator.close();
       recordIterator = null;
     }
-  }
-  if (fileSplitIterator.hasNext()) {
-    recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
-    return recordIterator.hasNext();
+    if (fileSplitIterator.hasNext()) {
+      recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
+      // Loop continues to check if this iterator has records
+    } else {
+      return false;
+    }
   }
-  return false;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 111 - 126, In CdcIterators.hasNext(), the current logic returns
false if recordIteratorFunc.apply(...) produces an empty iterator, prematurely
stopping instead of trying remaining file splits; change the branch that
consumes fileSplitIterator so it repeatedly (while fileSplitIterator.hasNext())
creates a new recordIterator via
recordIteratorFunc.apply(fileSplitIterator.next()), closes and discards it if
recordIterator.hasNext() is false, and continues to the next split, returning
true only when a non-empty recordIterator is found (and false if all splits
produce empty iterators); ensure recordIterator is closed when discarded and set
to null when exhausted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants