Skip to content

[OSS PR #18277] feat(dsv2): [RFC-98] DSv2 read for COW#42

Open
yihua wants to merge 10 commits into
masterfrom
oss-18277
Open

[OSS PR #18277] feat(dsv2): [RFC-98] DSv2 read for COW#42
yihua wants to merge 10 commits into
masterfrom
oss-18277

Conversation

@yihua
Copy link
Copy Markdown
Owner

@yihua yihua commented Apr 14, 2026

Mirror of apache#18277 for automated bot review.

Original author: @geserdugarov
Base branch: master

Summary by CodeRabbit

Release Notes

  • New Features

    • Added vector similarity search via hudi_vector_search table-valued functions with configurable distance metrics (cosine, L2, dot product) and algorithms (brute force).
    • Introduced Apache Spark DataSource V2 implementation for improved Spark SQL integration and optimizations.
    • Added clustering expiration and automatic rollback capabilities with heartbeat coordination.
    • Implemented catalog-backed partition discovery for improved metadata handling.
  • Improvements

    • Enhanced column pruning for incremental queries improving performance.
    • Added record-limiting push-down support in Flink source reads.
    • Improved log compaction handling for merge-on-read tables.
  • Bug Fixes

    • Fixed VECTOR type validation to prevent nested usage.
    • Improved schema handling in incremental reads with meta-field conflicts.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 14, 2026

Warning

Rate limit exceeded

@yihua has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 2 minutes and 39 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 2 minutes and 39 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 59fcb06b-58c4-4156-bbd9-7aeaf7fb2e6d

📥 Commits

Reviewing files that changed from the base of the PR and between 03b5493 and eb68296.

📒 Files selected for processing (118)
  • hudi-cli/src/main/java/org/apache/hudi/cli/utils/InputStreamConsumer.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperFactory.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
  • hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackHelper.java
  • hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
  • hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
  • hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
  • hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
  • hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
  • hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
  • hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
  • hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
  • hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
  • hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
  • hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
  • hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
  • hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
  • hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
  • hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieReadStats.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkMdtCompactionMetrics.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkRocksDBIndexMetrics.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CleanHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactionCommitHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactionPlanHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompositeCleanHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompositeCompactHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompositeCompactionCommitHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompositeCompactionPlanHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompositeTableServiceHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/DataTableCompactHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/DataTableCompactionCommitHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/DataTableCompactionPlanHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/DefaultCleanHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataTableCompactHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataTableCompactionCommitHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataTableCompactionPlanHandler.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/TableServiceHandlerFactory.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkCompactionMetrics.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/handler/TestCompositeHandlers.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/event/TestWriteMetadataEvent.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRocksDBIndexBackend.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
  • hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySchemaResolver.java
  • hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySchemaResolver.java
  • hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
  • hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
  • hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaIndexBlock.java
  • hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java
  • hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java
  • hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileWriter.java
  • hudi-io/src/test/java/org/apache/hudi/io/util/TestIOUtils.java
  • hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/spark/sql/hudi/v2/PartialLimitPushDown.java
  • hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.connector.catalog.TableProvider
  • hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieBatchScan.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieDataSourceV2.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieInputPartition.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieLocalScan.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodiePartitionReader.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodiePartitionReaderFactory.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieScanBuilder.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieSparkV2Table.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieStatistics.scala
  • hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
  • hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
  • hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/v2/TestDSv2CoexistenceWithDSv1.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/v2/TestDSv2CowSnapshotRead.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/v2/TestDSv2FilterPushdown.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/v2/TestDSv2Pushdowns.scala
  • hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
  • hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
  • hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark33Analysis.scala
  • hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/analysis/Spark33HoodiePruneFileSourcePartitions.scala
  • hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
  • hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark34Analysis.scala
  • hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
  • hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
  • hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
  • hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
  • hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
  • hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark40Analysis.scala
  • hudi-spark-datasource/hudi-spark4.0.x/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupportVariant.java
  • hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
  • hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestSparkSchemaUtils.java
  • hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveSchemaUtil.java
  • hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java
  • pom.xml
📝 Walkthrough

Walkthrough

This pull request implements clustering expiration management with heartbeat support, adds Apache Spark DataSource V2 snapshot read capabilities, introduces vector similarity search via table-valued functions, enhances incremental read column pruning and limit pushdown, refactors Flink RLI bootstrap coordination, and improves distributed metrics registry support across Hudi client, Spark, and Flink components.

Changes

Cohort / File(s) Summary
Clustering Expiration & Heartbeat Management
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java, hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
Added clustering expiration configuration (enable/threshold), heartbeat start/stop on clustering lifecycle events, expiration eligibility checks, and job-level clustering rollback utilities for partition-based cleanup.
Log Compaction Scheduling Enhancements
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java, hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
Added tracking of delta commits since log compaction completion, conditional log compaction scheduling logic, and utilities to retrieve last log compaction instant and delta-commit timeline segments.
Flink RLI Bootstrap Simplification
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
Removed pending-instants coordination protocol and checkpoint synchronization; streamlined RLI bootstrap operator to directly preload records without awaiting prior commit completion.
Flink Operator UID & Configuration Removal
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
Removed WRITE_OPERATOR_UID configuration option and pre-setting logic; operator IDs are now always generated dynamically.
Flink Disruptor Queue Draining
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java, hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
Added waitUntilDrained() method to wait for queue drainage without closing; updated checkpoint logic to drain instead of close/reinit.
Flink Source Reader Limit Pushdown
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java
Added limit parameter to scan context and source readers; implemented RecordLimiter to enforce global row-count limits across splits.
Flink Split Reader Abstraction
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/AbstractSplitReaderFunction.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
Extracted common state and lazy initialization into abstract base class; refactored CDC reader to delegate CDC iteration to centralized CdcIterators.
Flink CDC Iterator Consolidation
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
Consolidated CDC image management and iterator implementations into shared utilities; reduces duplication across input formats.
Flink Bucket Assignment Configuration
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
Updated bucket assigner to accept configuration and compute bucket assignments using partition hashing and configurable bucket indexing formulas.
Spark Vector Search Implementation
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
Added vector search table-valued functions (single-query and batch modes), distance metric computation utilities, and brute-force KNN search algorithm implementation.
Spark DSv2 Implementation
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieDataSourceV2.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieScanBuilder.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieBatchScan.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieSparkV2Table.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodiePartitionReader.java, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/.../*
Implemented complete Spark DataSource V2 stack: data source provider, catalog table, scan builders with filter/limit/aggregate pushdown, partition readers, and input partitions for COW snapshot reads.
Spark Incremental Read Column Pruning
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
Transitioned incremental relations from TableScan to PrunedScan interface; added schema pruning utilities and required-column filtering on scanned DataFrames.
Catalog-Backed Partition Listing
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
Added Spark catalog-backed metadata implementation for partition discovery when available; added configuration to enable/disable catalog-backed listing.
Vector Schema Validation
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala, hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
Added validation to reject VECTOR as array element, map value, or nested struct field; only top-level vector fields are permitted.
Spark Instant Format Normalization
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala
Added support for ISO datetime and epoch timestamp formats in query instant parsing; added separate normalization for incremental query boundaries with sentinel-value handling.
Metrics & Registry Infrastructure
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java, hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java, hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java, hudi-client/hudi-client-common/src/util/DistributedRegistryUtil.java
Refactored metrics registry creation and distribution; added distributed registry map management in Spark engine context; extracted wrapper FS metrics initialization to utility class.
Partition Path Filtering Utility
hudi-common/src/main/java/org/apache/hudi/util/PartitionPathFilterUtil.java, hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java, hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java, hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
Added centralized partition path filtering utility; refactored metadata classes to use common predicate logic; added database name to metadata context.
Log File Reading Utilities
hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java, hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java, hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
Added utilities to retrieve log files by max commit instant; added timeline filter for completion-time-based instant queries.
Parquet Reader Lifecycle
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
Made ParquetReaderIterator idempotent on close; auto-closes when underlying reader exhausted.
HoodieWrapperFileSystem Constants
hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
Added public registry name constants for metrics state management.
Configuration & Span Additions
pom.xml, hudi-spark-datasource/hudi-spark-common/pom.xml, hudi-spark-datasource/hudi-spark/pom.xml
Added Spark MLlib Local dependency for vector distance computations.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes


🐰 Whisker-twitching over this feast of features,
Vector searches, DSv2 reaches, heartbeats and creature teachers,
Clustering wise, log compaction's the prize,
Flink's simpler, Spark's crisper—hopping through paradise!

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch oss-18277

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Note

Due to the large number of review comments, Critical severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala (1)

183-191: ⚠️ Potential issue | 🟠 Major

Use the glob-filtered path sets when deciding whether to load each file group.

After INCR_PATH_GLOB is applied, the read branches still check metaBootstrapFileIdToFullPath.nonEmpty and regularFileIdToFullPath.nonEmpty. If the glob filters one of those groups down to zero paths, this still executes a read with an empty READ_PATHS or empty varargs, which can fail or behave unexpectedly. Branch on filteredMetaBootstrapFullPaths / filteredRegularFullPaths instead.

Patch sketch
-            if (metaBootstrapFileIdToFullPath.nonEmpty) {
+            if (filteredMetaBootstrapFullPaths.nonEmpty) {
               df = sqlContext.sparkSession.read
                 .format("hudi_v1")
                 .schema(prunedSchema)
                 .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
                 .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime)
                 .load()
             }
 
-            if (regularFileIdToFullPath.nonEmpty) {
+            if (filteredRegularFullPaths.nonEmpty) {
               try {
                 val commitTimesToReturn = commitsToReturn.map(_.requestedTime)
                 df = df.union(sqlContext.read.options(sOpts)
                   .schema(prunedSchema).format(formatClassName)
                   .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime)
                   .load(filteredRegularFullPaths.toList: _*)
                   .filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToReturn: _*)))

Also applies to: 260-277

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala`
around lines 183 - 191, The glob filter produces filteredRegularFullPaths and
filteredMetaBootstrapFullPaths, but the later read branches still test the
original maps (regularFileIdToFullPath/metaBootstrapFileIdToFullPath) and may
invoke reads with empty READ_PATHS/varargs; update IncrementalRelationV2 to
branch on filteredMetaBootstrapFullPaths.nonEmpty and
filteredRegularFullPaths.nonEmpty (and use their contents when constructing
options/READ_PATHS and varargs) instead of the original maps, and apply the same
replacement for the other read block (the second occurrence referenced around
the later branch) so all reads honor the glob-filtered sets.
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java (1)

484-516: ⚠️ Potential issue | 🟠 Major

Nested VECTOR validation does not traverse ARRAY/MAP/UNION branches

At Line 512, the validator only recurses through RECORD fields. Schemas like ARRAY<RECORD{VECTOR}> or MAP<STRING, RECORD{VECTOR}> can bypass this check, even though nested VECTOR is supposed to be rejected.

💡 Proposed fix
-    validateNoVectorInNestedRecord(fields, false);
+    validateNoVectorPlacement(fields);
@@
-  private static void validateNoVectorInNestedRecord(List<HoodieSchemaField> fields, boolean nested) {
-    for (HoodieSchemaField field : fields) {
-      HoodieSchema nonNull = field.schema().getNonNullType();
-      if (nested && nonNull.getType() == HoodieSchemaType.VECTOR) {
-        throw new HoodieSchemaException(
-            "VECTOR column '" + field.name() + "' must be a top-level field. "
-                + "Nested VECTOR columns (inside STRUCT, ARRAY, or MAP) are not supported.");
-      }
-      if (nonNull.getType() == HoodieSchemaType.RECORD) {
-        validateNoVectorInNestedRecord(nonNull.getFields(), true);
-      }
-    }
-  }
+  private static void validateNoVectorPlacement(List<HoodieSchemaField> fields) {
+    for (HoodieSchemaField field : fields) {
+      validateNoVectorPlacement(field.schema(), false, field.name());
+    }
+  }
+
+  private static void validateNoVectorPlacement(HoodieSchema schema, boolean nested, String fieldName) {
+    HoodieSchema nonNull = schema.getNonNullType();
+    switch (nonNull.getType()) {
+      case VECTOR:
+        if (nested) {
+          throw new HoodieSchemaException(
+              "VECTOR column '" + fieldName + "' must be a top-level field. "
+                  + "Nested VECTOR columns (inside STRUCT, ARRAY, or MAP) are not supported.");
+        }
+        break;
+      case RECORD:
+        for (HoodieSchemaField child : nonNull.getFields()) {
+          validateNoVectorPlacement(child.schema(), true, child.name());
+        }
+        break;
+      case ARRAY:
+        validateNoVectorPlacement(nonNull.getElementType(), true, fieldName);
+        break;
+      case MAP:
+        validateNoVectorPlacement(nonNull.getValueType(), true, fieldName);
+        break;
+      case UNION:
+        for (HoodieSchema t : nonNull.getTypes()) {
+          if (t.getType() != HoodieSchemaType.NULL) {
+            validateNoVectorPlacement(t, nested, fieldName);
+          }
+        }
+        break;
+      default:
+    }
+  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java`
around lines 484 - 516, The validateNoVectorInNestedRecord routine only recurses
into RECORD types, letting VECTOR hide inside ARRAY, MAP, or UNION branches;
update validateNoVectorInNestedRecord(List<HoodieSchemaField>, boolean) to, for
each field, obtain nonNull = field.schema().getNonNullType() and if nested &&
nonNull.getType() == HoodieSchemaType.VECTOR throw the existing
HoodieSchemaException, then handle all container types: if nonNull.getType() ==
HoodieSchemaType.RECORD recurse into nonNull.getFields(), if ARRAY recurse into
nonNull.getElementType() (treating element type as a pseudo-field or directly
checking its type and recursing as needed), if MAP recurse into
nonNull.getValueType(), and if UNION iterate all branch types and recurse into
each; ensure recursion always sets nested=true when descending so VECTOR in any
nested branch is rejected (use the existing HoodieSchema and HoodieSchemaField
accessors to locate fields/types).
🟠 Major comments (18)
hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java-77-88 (1)

77-88: ⚠️ Potential issue | 🟠 Major

Don't substitute requested time for an archived completion-time cutoff.

This lookup only searches the active timeline, then falls back to maxCommitInstantTime when the instant is already archived. At that point the filter is no longer “before or equals by completion time”, so earlier instants that completed before the archived max commit finished can be dropped incorrectly. Please resolve the cutoff from archived/raw timeline metadata instead of switching domains here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java`
around lines 77 - 88, The code in LogReaderUtils computes
maxCommitCompletionTime by looking only at the active timeline and falling back
to maxCommitInstantTime (requested time) when the instant is archived, which
mixes domains and can drop earlier commits; update the logic that computes
maxCommitCompletionTime (the block using
metaClient.getActiveTimeline().filterCompletedInstants() and mapping to
instant.getCompletionTime()) to, when the instant is not found in the active
timeline, retrieve the archived/raw instant metadata from the metaClient’s full
timeline (e.g., the archived/completed timeline or
metaClient.getAllCompletedInstants() / metaClient.getArchivedTimeline()) and
extract the true completion time from that instant instead of using
maxCommitInstantTime, then pass that completion time to
fsView.getTimeline().findInstantsModifiedBeforeOrEqualsByCompletionTime(...) so
the cutoff is always based on completionTime from the correct timeline domain.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java-164-173 (1)

164-173: ⚠️ Potential issue | 🟠 Major

Count only completed delta commits for log-compaction triggering.

This path feeds getDeltaCommitTimeline() directly into the helper, so the count later used by needLogCompact includes inflight delta commits. One pending delta after the last log compaction can satisfy the threshold before any new log blocks are actually committed.

Suggested fix
     Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
         CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
-            table.getActiveTimeline().getDeltaCommitTimeline(), rawActiveTimeline);
+            table.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(), rawActiveTimeline);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java`
around lines 164 - 173, The current implementation of
getLatestDeltaCommitInfoSinceLogCompaction passes
table.getActiveTimeline().getDeltaCommitTimeline() which includes inflight delta
commits, causing needLogCompact to count uncompleted commits; change the call to
use the completed delta commit timeline (e.g.,
table.getActiveTimeline().getCompletedDeltaCommitTimeline() or otherwise filter
the returned HoodieTimeline to only completed instants) so that the Pair count
returned by getLatestDeltaCommitInfoSinceLogCompaction (in method
getLatestDeltaCommitInfoSinceLogCompaction and caller needLogCompact) reflects
only completed delta commits when invoking
CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction.
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java-295-303 (1)

295-303: ⚠️ Potential issue | 🟠 Major

Keep this API purely completion-time based.

Lines 300-302 mix a requested-time equality check into a completion-time cutoff. That can pull in an instant whose completionTime is actually after the boundary whenever another instant happens to start at the same timestamp the cutoff instant completed. The fallback for “I only know the requested time” should stay at the call site, not in the generic completion-time filter.

Suggested fix
   public HoodieTimeline findInstantsModifiedBeforeOrEqualsByCompletionTime(String instantTime) {
     return factory.createDefaultTimeline(instants.stream()
-            // completed instants with completionTime <= instantTime or matching requestedTime,
+            // completed instants with completionTime <= instantTime,
             // or pending instants with requestedTime <= instantTime
             .filter(s -> (s.getCompletionTime() != null
-                    && (compareTimestamps(s.getCompletionTime(), LESSER_THAN_OR_EQUALS, instantTime) || s.requestedTime().equals(instantTime)))
+                    && compareTimestamps(s.getCompletionTime(), LESSER_THAN_OR_EQUALS, instantTime))
                 || (s.getCompletionTime() == null && compareTimestamps(s.requestedTime(), LESSER_THAN_OR_EQUALS, instantTime))),
         getInstantReader());
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java`
around lines 295 - 303, In
BaseHoodieTimeline.findInstantsModifiedBeforeOrEqualsByCompletionTime, the
filter erroneously mixes requestedTime equality into the completion-time cutoff;
remove the requestedTime.equals(instantTime) check and the branch that includes
pending instants—restrict the filter to only include instants with a non-null
completionTime where compareTimestamps(s.getCompletionTime(),
LESSER_THAN_OR_EQUALS, instantTime) is true so the API remains purely
completion-time based; leave any requested-time-based fallback logic to callers
of findInstantsModifiedBeforeOrEqualsByCompletionTime.
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java-127-138 (1)

127-138: ⚠️ Potential issue | 🟠 Major

Honor pre-existing interrupts before entering the drain loop.

If the thread is already interrupted and the queue happens to be empty, this method returns immediately without calling markAsFailed(...). That lets snapshotState()/endInput() proceed as if the drain succeeded, which drops the timeout/cancellation signal this method is supposed to surface.

Proposed fix
   public void waitUntilDrained() {
+    if (throwable.get() != null) {
+      return;
+    }
+    if (Thread.currentThread().isInterrupted()) {
+      markAsFailed(new HoodieException("Interrupted while waiting for disruptor queue to drain"));
+      return;
+    }
     while (!isEmpty()) {
       if (throwable.get() != null) {
         return;
       }
       if (Thread.currentThread().isInterrupted()) {
         markAsFailed(new HoodieException("Interrupted while waiting for disruptor queue to drain"));
         return;
       }
       LockSupport.parkNanos(100_000);
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java`
around lines 127 - 138, The method waitUntilDrained() must honor pre-existing
interrupts before entering the drain loop: add a check for
Thread.currentThread().isInterrupted() immediately before the while (!isEmpty())
loop and if true call markAsFailed(new HoodieException("Interrupted while
waiting for disruptor queue to drain")) and return; keep existing
throwable.get() handling and existing loop behavior unchanged so
snapshotState()/endInput() will not proceed as if drain succeeded when an
interrupt was already set.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala-128-136 (1)

128-136: ⚠️ Potential issue | 🟠 Major

Metadata loss in schema realignment could drop VECTOR/BLOB annotations before write.

This branch rebuilds columns from tgtField.dataType and tgtField.name only, discarding Spark field metadata. Since catalog-only annotations like VECTOR/BLOB are carried in StructField.metadata, these can disappear before the write is handed off. Additionally, positional zipping of incoming fields to target schema fields is risky; if data columns arrive in a different order than the table schema, the positional zip can map values into wrong target fields.

To fix this, preserve target field metadata when casting and renaming:

Suggested approach
-          } else if (data.columns.length == userSchema.length) {
-            val columns = data.schema.fields.zip(userSchema.fields).map {
-              case (srcField, tgtField) =>
-                data.col(srcField.name).cast(tgtField.dataType).as(tgtField.name)
-            }
-            data.select(columns: _*)
+          } else if (data.columns.length == userSchema.length) {
+            val columns = data.schema.fields.zip(userSchema.fields).map {
+              case (srcField, tgtField) =>
+                data.col(srcField.name)
+                  .cast(tgtField.dataType)
+                  .as(tgtField.name)
+                  .withMetadata(tgtField.metadata)
+            }
+            data.select(columns: _*)

Note: Use withMetadata() to attach metadata after the cast, or check if your Spark version supports a metadata parameter in the as() overload.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala`
around lines 128 - 136, The current alignment logic in HoodieInternalV2Table
(alignedData creation) zips source fields positionally and rebuilds StructFields
from tgtField.dataType and tgtField.name, which loses StructField.metadata
(dropping VECTOR/BLOB) and can mis-map columns if orders differ; change the
mapping to match fields by name: for each field in userSchema
(hoodieCatalogTable.tableSchemaWithoutMetaFields) locate the source field by
name in data.schema (or data.columns), cast the source column to
tgtField.dataType, then restore the target StructField.metadata (use
withMetadata(...) or the as(name, metadata) overload if available) and finally
select the columns in target order (data.select(...)) so metadata is preserved
and columns are correctly aligned by name rather than by position.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala-221-223 (1)

221-223: ⚠️ Potential issue | 🟠 Major

Don't prune away columns referenced by PUSH_DOWN_INCR_FILTERS.

prunedSchema is derived only from requiredColumns, but the option filters are applied later on scanDf. A query that projects name and sets hoodie.datasource.read.incr.pushdown.filters=id > 1 now builds a DataFrame without id, so Line 293 fails during analysis. Either keep the filter columns in the read schema or apply those filters before the final projection/drop step.

Patch sketch
-      val prunedSchema = IncrementalRelationUtil.getPrunedSchema(requiredColumns, usedSchema, metaClient)
+      val prunedSchema =
+        if (filters.nonEmpty) usedSchema
+        else IncrementalRelationUtil.getPrunedSchema(requiredColumns, usedSchema, metaClient)
...
-      scanDf = IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient)
-      filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
+      scanDf = filters.foldLeft(scanDf)((e, f) => e.filter(f))
+      IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient).rdd

Also applies to: 292-293

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala`
around lines 221 - 223, prunedSchema is computed only from requiredColumns so
columns used by the incremental filter option
(hoodie.datasource.read.incr.pushdown.filters / symbol PUSH_DOWN_INCR_FILTERS)
may be pruned and later cause analysis errors when scanDf applies filters;
update the logic around IncrementalRelationUtil.getPrunedSchema to also include
any column names referenced by the pushdown filters (parse the filter option and
union those column names into requiredColumns before calling getPrunedSchema) or
alternatively apply the pushdown filters to scanDf before performing the final
projection/drop; reference prunedSchema, requiredColumns,
IncrementalRelationUtil.getPrunedSchema, PUSH_DOWN_INCR_FILTERS and scanDf when
making the change.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala-48-63 (1)

48-63: ⚠️ Potential issue | 🟠 Major

Guard the catalog lookup and verify it matches datasetBasePath.

This assumes tableConfig.getTableName resolves to the correct catalog entry. For path-based reads, or when the current database contains another table with the same name, getTableMetadata either throws before the filesystem fallback can run or returns partitions for the wrong table. Please treat the catalog table as optional and only enable the catalog path when its location matches datasetBasePath.

Patch sketch
+import scala.util.Try
+
   private lazy val catalogDatabaseName =
     if (StringUtils.isNullOrEmpty(tableConfig.getDatabaseName)) {
       sparkSession.sessionState.catalog.getCurrentDatabase
     } else {
       tableConfig.getDatabaseName
     }
   private lazy val tableIdentifier = TableIdentifier(catalogTableName, Some(catalogDatabaseName))
-  private lazy val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+  private lazy val catalogTableOpt = Try(
+    sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+  ).toOption.filter { table =>
+    new StoragePath(table.location.toString).toString == new StoragePath(datasetBasePath).toString
+  }
 
-  private def isPartitionedTable: Boolean = {
-    catalogTable.partitionColumnNames.nonEmpty
-  }
+  private def isPartitionedTable: Boolean =
+    catalogTableOpt.exists(_.partitionColumnNames.nonEmpty)
 
-  private def shouldUseCatalogPartitions: Boolean = {
-    isPartitionedTable && catalogTable.tracksPartitionsInCatalog
-  }
+  private def shouldUseCatalogPartitions: Boolean =
+    catalogTableOpt.exists(t => t.partitionColumnNames.nonEmpty && t.tracksPartitionsInCatalog)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala`
around lines 48 - 63, The current eager lookup via lazy val catalogTable
(constructed from tableIdentifier and
sparkSession.sessionState.catalog.getTableMetadata) can throw or return metadata
for a different table; change the code to treat the catalog table lookup as
optional: replace the lazy val catalogTable with a safe method/val that catches
exceptions from getTableMetadata and returns Option[CatalogTable], and do not
call getTableMetadata eagerly during object init. Before enabling catalog-backed
partitions (used by isPartitionedTable and shouldUseCatalogPartitions), verify
the optional CatalogTable's storage.locationUri (or tableLocation) matches
datasetBasePath (normalize both URIs/paths) and only then consider
partitionColumnNames.nonEmpty and tracksPartitionsInCatalog; if the lookup
failed or locations don’t match, return None/false so fallback to filesystem
path-based behavior remains possible.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala-46-73 (1)

46-73: ⚠️ Potential issue | 🟠 Major

Deduplicate auto-added helper columns in prunedSchema.

If an ordering field overlaps a partition field or _hoodie_commit_time, this builds a StructType with duplicate column names. That breaks downstream projection/resolution for valid table configs such as precombine == partition. Track already-added fields across all three append phases, not just against requiredColumns.

Patch sketch
 object IncrementalRelationUtil {
 
   def getPrunedSchema(requiredColumns: Array[String],
                       usedSchema: StructType,
                       metaClient: HoodieTableMetaClient) = {
     var prunedSchema = StructType(Seq())
+    val added = scala.collection.mutable.HashSet.empty[String]
+
+    def addField(name: String): Unit = {
+      usedSchema.find(_.name == name).foreach { field =>
+        if (added.add(field.name)) {
+          prunedSchema = prunedSchema.add(field)
+        }
+      }
+    }
 
     // _hoodie_commit_time is a required field. using which query filters are applied.
     if (!requiredColumns.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
-      prunedSchema = prunedSchema.add(usedSchema(HoodieRecord.COMMIT_TIME_METADATA_FIELD))
+      addField(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
     }
 
     // Add all the required columns as part of pruned schema
     requiredColumns.foreach(col => {
-      val field = usedSchema.find(_.name == col)
-      if (field.isDefined) {
-        prunedSchema = prunedSchema.add(field.get)
-      }
+      addField(col)
     })
 
     // All the partition fields are required columns while querying the data.
     val tableConfig = metaClient.getTableConfig
     val partitionColumns = tableConfig.getPartitionFields
     if (partitionColumns.isPresent) {
       partitionColumns.get().foreach(col => {
         if (!requiredColumns.contains(col)) {
-          val field = usedSchema.find(_.name == col)
-          if (field.isDefined) {
-            prunedSchema = prunedSchema.add(field.get)
-          }
+          addField(col)
         }
       })
     }
 
     // The precombine/ordering field is required for merge logic in MOR tables
     if (tableConfig.getTableType == HoodieTableType.MERGE_ON_READ) {
       val orderingFields = tableConfig.getOrderingFields
       if (!orderingFields.isEmpty) {
         orderingFields.forEach(col => {
           if (!requiredColumns.contains(col)) {
-            val field = usedSchema.find(_.name == col)
-            if (field.isDefined) {
-              prunedSchema = prunedSchema.add(field.get)
-            }
+            addField(col)
           }
         })
       }
     }
     prunedSchema
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala`
around lines 46 - 73, The code currently can add duplicate column names into
prunedSchema because partitionColumns and orderingFields (and implicit helper
columns like "_hoodie_commit_time") are only checked against requiredColumns;
change this by introducing a mutable Set[String] (e.g., alreadyAdded)
initialized with requiredColumns, then in the partitionColumns loop,
orderingFields loop, and when adding any helper column, check
alreadyAdded.contains(col) before adding to prunedSchema and after prunedSchema
= prunedSchema.add(field.get) do alreadyAdded += col; use the existing symbols
(prunedSchema, requiredColumns, usedSchema, tableConfig, partitionColumns,
orderingFields) so duplicates are prevented across all three append phases.
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCatalogBackedTableMetadata.scala-43-43 (1)

43-43: ⚠️ Potential issue | 🟠 Major

Scope Spark SQL config changes per test to avoid cross-test leakage.

FILE_INDEX_PARTITION_LISTING_VIA_CATALOG is mutated globally and not consistently restored. This can make tests order-dependent/flaky.

♻️ Suggested fix (use scoped SQL conf)
- spark.conf.set(DataSourceReadOptions.FILE_INDEX_PARTITION_LISTING_VIA_CATALOG.key, "true")
- // ... test body
+ withSQLConf(DataSourceReadOptions.FILE_INDEX_PARTITION_LISTING_VIA_CATALOG.key -> "true") {
+   // ... test body
+ }

Also applies to: 136-136, 205-205, 296-296, 322-322, 415-415, 516-516

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCatalogBackedTableMetadata.scala`
at line 43, The test mutates the global Spark SQL config
DataSourceReadOptions.FILE_INDEX_PARTITION_LISTING_VIA_CATALOG via
spark.conf.set, causing cross-test leakage; change those calls so the config is
scoped per-test by wrapping the test logic with a scoped configuration helper
(e.g.,
withSQLConf(Map(DataSourceReadOptions.FILE_INDEX_PARTITION_LISTING_VIA_CATALOG.key
-> "true")) { ... }) or an equivalent try/finally that restores the previous
value; update occurrences referenced in TestCatalogBackedTableMetadata (the
places that call spark.conf.set for FILE_INDEX_PARTITION_LISTING_VIA_CATALOG) to
use the scoped helper so the config is reset after each test.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala-90-107 (1)

90-107: ⚠️ Potential issue | 🟠 Major

Harden k parsing for non-foldable expressions and eval failures.

parseK directly calls expr.eval() without validation. Similar query vector validation in HoodieSparkBaseAnalysis.evaluateQueryVector (line 339) checks expr.foldable before evaluation. Non-foldable expressions or eval failures will leak internal Spark exceptions instead of consistent HoodieAnalysisException. Add foldability check and wrap expr.eval() in try-catch to align with the defensive pattern already used for query vectors in the same codebase.

🛠️ Suggested fix
   private[logical] def parseK(funcName: String, expr: Expression): Int = {
-    val rawValue = expr.eval()
+    if (!expr.foldable) {
+      throw new HoodieAnalysisException(
+        s"Function '$funcName': k must be a foldable positive integer literal, got: ${expr.sql}")
+    }
+    val rawValue = try {
+      expr.eval()
+    } catch {
+      case e: Exception =>
+        throw new HoodieAnalysisException(
+          s"Function '$funcName': failed to evaluate k from expression '${expr.sql}'", e)
+    }
     if (rawValue == null) {
       throw new HoodieAnalysisException(
         s"Function '$funcName': k must be a positive integer, got null")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala`
around lines 90 - 107, The parseK method currently calls expr.eval() directly
and can leak Spark exceptions for non-foldable expressions or eval failures;
update parseK (in HoodieVectorSearchTableValuedFunction) to first check
expr.foldable and throw a HoodieAnalysisException with a clear message if not
foldable, then wrap the expr.eval() call in a try-catch that converts any thrown
exceptions into a HoodieAnalysisException (preserving the original error
message/context) before continuing with the existing null, NumberFormatException
and non-positive checks so all failures produce consistent
HoodieAnalysisException errors.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java-53-69 (1)

53-69: ⚠️ Potential issue | 🟠 Major

Honor NO_LIMIT and reserve slots atomically.

wrap() currently suppresses every record when limit == NO_LIMIT because the first guard sees 0 >= -1, and the separate get() / incrementAndGet() sequence lets concurrent wrapped batches claim the same last slot. Bypass wrapping for NO_LIMIT, and use an atomic reservation pattern for bounded limits so the cap cannot be exceeded.

Suggested fix
   public <T> RecordsWithSplitIds<T> wrap(RecordsWithSplitIds<T> records) {
+    if (limit == NO_LIMIT) {
+      return records;
+    }
+
     return new RecordsWithSplitIds<T>() {
@@
       `@Override`
       public T nextRecordFromSplit() {
-        if (totalReadCount.get() >= limit) {
-          return null;
-        }
-        T record = records.nextRecordFromSplit();
-        if (record != null) {
-          totalReadCount.incrementAndGet();
-        }
-        return record;
+        while (true) {
+          long current = totalReadCount.get();
+          if (current >= limit) {
+            return null;
+          }
+
+          if (totalReadCount.compareAndSet(current, current + 1)) {
+            T record = records.nextRecordFromSplit();
+            if (record == null) {
+              totalReadCount.decrementAndGet();
+            }
+            return record;
+          }
+        }
       }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java`
around lines 53 - 69, wrap() must bypass wrapping when limit == NO_LIMIT and
reserve slots atomically to avoid overshooting; change wrap() so that if limit
== NO_LIMIT it simply returns the incoming RecordsWithSplitIds<T> (no wrapper),
and inside the wrapper's nextRecordFromSplit() replace the separate get() /
incrementAndGet() with an atomic reservation loop that reads totalReadCount,
checks against limit, and uses compareAndSet to increment only when a slot is
successfully reserved (e.g., loop: read cur = totalReadCount.get(); if cur >=
limit return null; if totalReadCount.compareAndSet(cur, cur+1) break;), then
fetch the record from records.nextRecordFromSplit(); ensure you reference the
existing symbols totalReadCount, limit, NO_LIMIT, wrap(), and
nextRecordFromSplit() when making the change.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java-279-285 (1)

279-285: ⚠️ Potential issue | 🟠 Major

Reject negative clustering-expiration thresholds.

A negative value makes every pending clustering instant immediately look expired once this feature is enabled, which can roll back live work instead of only stale instants. Please validate this as non-negative in Builder.validate().

Suggested fix
     private void validate() {
+      long expirationThresholdMins = clusteringConfig.getLong(EXPIRATION_THRESHOLD_MINS);
+      ValidationUtils.checkArgument(expirationThresholdMins >= 0,
+          EXPIRATION_THRESHOLD_MINS.key() + " must be >= 0");
+
       boolean inlineCluster = clusteringConfig.getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
       boolean inlineClusterSchedule = clusteringConfig.getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java`
around lines 279 - 285, HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS must
not be negative; update the Builder.validate() method in
HoodieClusteringConfig.Builder to read the configured EXPIRATION_THRESHOLD_MINS
value and throw an IllegalArgumentException (or add a validation error) if it is
less than 0L, with a clear message like
"hoodie.clustering.expiration.threshold.mins must be non-negative"; ensure you
reference the ConfigProperty EXPIRATION_THRESHOLD_MINS when fetching the value
so the check covers both default and user-supplied settings.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java-169-175 (1)

169-175: ⚠️ Potential issue | 🟠 Major

Potential infinite loop in skipBytesToRead if stream reaches EOF early.

The while (numBytes > 0) loop calls skipBytes(numBytes), which can return 0 when the underlying stream reaches EOF. If serialized data is truncated or corrupt and the deserializer attempts to skip beyond available bytes, skipBytes() returns 0, the loop condition remains true, and the thread hangs indefinitely.

Add a check to detect zero progress and either throw EOFException or advance by one byte to force progress:

Suggested fix
+import java.io.EOFException;
@@
     `@Override`
     public void skipBytesToRead(int numBytes) throws IOException {
       while (numBytes > 0) {
         int skipped = skipBytes(numBytes);
+        if (skipped == 0) {
+          if (read() < 0) {
+            throw new EOFException("Unexpected end of serialized RowData");
+          }
+          skipped = 1;
+        }
         numBytes -= skipped;
       }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java`
around lines 169 - 175, The loop in skipBytesToRead keeps calling
skipBytes(numBytes) and can hang if skipBytes returns 0 at EOF; update
skipBytesToRead (in class CdcImageManager) to detect when skipped == 0 and throw
an EOFException (or otherwise fail fast) instead of looping forever, so the
method either makes progress or fails with a clear EOF error when the underlying
stream is exhausted.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java-168-219 (1)

168-219: ⚠️ Potential issue | 🟠 Major

“Limit push-down” tests don’t actually exercise limit behavior.

The new tests are named around limit handling, but they instantiate the same constructor path with no distinct limit scenario. This leaves limit semantics unverified while signaling coverage.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java`
around lines 168 - 219, The tests currently instantiate
HoodieCdcSplitReaderFunction but never pass or verify the explicit limit
parameter; update/add tests to call the 7-argument constructor variant of
HoodieCdcSplitReaderFunction that accepts an explicit limit (positive, zero, and
negative) and assert the expected limit semantics: for a positive limit assert
the reader/wrapping that enforces the limit is present (e.g. via visible
behavior or inspecting the internal field via accessor/reflection), and for zero
and negative limits assert that the limit wrapper is not applied and the
function still constructs; use the constructor symbol
HoodieCdcSplitReaderFunction(conf, tableState, internalSchemaManager,
fieldTypes, projections, isCaseSensitive, limit) and add explicit assertions per
case rather than just asserting notNull.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestRecordLimiter.java-226-262 (1)

226-262: ⚠️ Potential issue | 🟠 Major

Don't normalize LIMIT overshoot in the test.

Allowing limit + threadCount rows makes concurrent over-read look valid, but LIMIT pushdown is a correctness contract. This test should fail on any overshoot and force RecordLimiter toward an atomic reservation/CAS-style implementation instead of blessing the race.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestRecordLimiter.java`
around lines 226 - 262, The test currently allows overshoot up to limit +
threadCount; change it to enforce strict LIMIT semantics by asserting no
overshoot. In testConcurrentReadsDoNotExceedLimitByMoreThanThreadCount(),
replace the two assertions that check totalRead.get() (the assertTrue >= limit
and the assertTrue <= limit + threadCount) with a single assertion that
totalRead.get() equals the configured limit (use assertEquals(limit,
totalRead.get()) with an appropriate message), leaving the
limiter.isLimitReached() check intact so the test will fail if RecordLimiter
permits any over-read.
hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java-152-155 (1)

152-155: ⚠️ Potential issue | 🟠 Major

setRegistries() rebuilds a different key than getRegistryOfClass().

Table-scoped registries are created under tableName::registryName, but this method re-inserts them under :: + registry.getName(). For names like table.registry, the next getRegistryOfClass(tableName, registryName, ...) lookup misses the preloaded instance and creates a duplicate registry.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java` around
lines 152 - 155, setRegistries() is inserting preloaded registries with a key
built from makeKey("", registry.getName()) which doesn't match the table-scoped
key used by getRegistryOfClass(tableName, registryName,...), causing lookups to
miss and duplicate registries; change setRegistries to rebuild keys the same way
getRegistryOfClass expects (use the original table scope/name components rather
than an empty tableName), i.e., derive and call makeKey(tableName,
registry.getName()) or otherwise preserve the exact key used when the Registry
was first created before putting into REGISTRY_MAP so entries can be found by
getRegistryOfClass; update references to REGISTRY_MAP.putIfAbsent accordingly.
hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java-96-109 (1)

96-109: ⚠️ Potential issue | 🟠 Major

Fail fast on registry type collisions.

If a key already exists with a different runtime type, this method still returns it after logging. That lets callers asking for DistributedRegistry silently get LocalRegistry instead, which changes aggregation semantics without any hard failure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java` around
lines 96 - 109, The getRegistryOfClass method currently logs but returns an
existing Registry from REGISTRY_MAP even when its runtime class
(registry.getClass().getName()) differs from the requested clazz; change this to
fail fast by throwing an unchecked exception (e.g. IllegalStateException) when a
type collision is detected. Locate getRegistryOfClass, the REGISTRY_MAP usage
and the post-computeIfAbsent check, and replace the LOG.error branch with
throwing an exception that includes the registryName, the existing
registry.getClass().getName(), and the requested clazz (retain the descriptive
message), so callers cannot silently receive the wrong Registry implementation.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java-161-174 (1)

161-174: ⚠️ Potential issue | 🟠 Major

Keep hasNext() side-effect free.

These iterators advance the underlying reader inside hasNext(), violating the Iterator contract. Calling hasNext() multiple times before next() will skip data. For example, in AddBaseFileIterator.hasNext(), calling the method twice consumes two records from the nested iterator but only one can be retrieved via next().

Applies to: lines 161-174, 288-332, 420-442.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 161 - 174, hasNext() currently advances the underlying iterator
(calling nested.next() and assigning currentRecord), which makes it stateful and
causes skipped records when hasNext() is called multiple times; change each
iterator (e.g., CdcIterators implementations such as AddBaseFileIterator) to be
side-effect free by introducing a lookahead buffer field (e.g., bufferedRecord)
and only read from nested in hasNext() to populate bufferedRecord if it's null
(without discarding it), and then have next() return and clear bufferedRecord
(or throw if null); keep setting RowKind on the buffered record when you
populate it and remove the direct nested.next() call from hasNext().
🟡 Minor comments (5)
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java-374-380 (1)

374-380: ⚠️ Potential issue | 🟡 Minor

Minor typo in comment.

Line 375: "recommiting" should be "recommitting".

Proposed fix
-    // There exists pending instants waiting for recommiting.
+    // There exist pending instants waiting for recommitting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java`
around lines 374 - 380, The comment in subtaskReset incorrectly spells
"recommiting"; update the inline comment on the block that calls
executor.executeSync(() -> commitInstants(resetCkpId), ...) so it reads
"recommitting" (e.g., "use sync execution here to make sure the recommitting
finishes before RLI bootstrapping"), leaving all code paths (subtaskReset,
commitInstants, executor.executeSync, eventBuffers, tableState) unchanged.
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java-58-61 (1)

58-61: ⚠️ Potential issue | 🟡 Minor

Avoid double-closing the underlying reader on the EOF path.

Closing here is fine, but next() still catches the intentional end-of-data HoodieException and runs closeQuietly(parquetReader) again. That means an exhausted iterator now turns into a second ParquetReader.close() plus a misleading "unable to read" wrapper.

Proposed fix
   `@Override`
   public T next() {
     try {
       // To handle case when next() is called before hasNext()
       if (this.next == null) {
         if (!hasNext()) {
           throw new HoodieException("No more records left to read from parquet file");
         }
       }
       T retVal = this.next;
       this.next = null;
       return retVal;
+    } catch (HoodieException e) {
+      throw e;
     } catch (Exception e) {
       FileIOUtils.closeQuietly(parquetReader);
       throw new HoodieException("unable to read next record from parquet file ", e);
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java`
around lines 58 - 61, The iterator currently calls close() in the hasNext path
and next() also swallows the EOF HoodieException and calls
closeQuietly(parquetReader) again; change next() so it does not call
closeQuietly/parquetReader.close() for the intentional end-of-data exception.
Specifically, in class ParquetReaderIterator adjust the exception handling in
the next() method: detect the EOF/intentional end condition (the HoodieException
thrown for end-of-data) and either rethrow or handle it without invoking
closeQuietly(parquetReader) (since close() was already called when this.next ==
null), while still wrapping/closing for genuine read errors. Ensure references
to this.next, parquetReader, close(), and closeQuietly(parquetReader) are used
to locate the logic to modify.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala-309-309 (1)

309-309: ⚠️ Potential issue | 🟡 Minor

Duplicate log message.

This log line is a duplicate of Line 244. One should be removed to avoid redundant logging.

Proposed fix
       scanDf = IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient)
-      log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))
       filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala`
at line 309, In IncrementalRelationV1, the log.info call that prints "Additional
Filters to be applied to incremental source are :" with filters.mkString is
duplicated; remove the redundant occurrence (the second log.info referencing
filters.mkString) so only the original log in IncrementalRelationV1 remains
(keep the first instance and delete the later duplicate).
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala-471-472 (1)

471-472: ⚠️ Potential issue | 🟡 Minor

Make exception-message assertions null-safe to avoid masking failures.

These assertions can throw NPE when getCause is null and the first condition is false. Use a root-message helper and assert once.

🛠️ Suggested fix pattern
+  private def rootMessage(e: Throwable): String =
+    Option(e.getCause).map(rootMessage).getOrElse(e.getMessage)
...
-    assertTrue(ex.getMessage.contains("nonexistent_col") ||
-      ex.getCause.getMessage.contains("nonexistent_col"))
+    val msg = rootMessage(ex)
+    assertTrue(msg.contains("nonexistent_col"), s"Expected missing-column error, got: $msg")

Also applies to: 491-492, 503-504

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala`
around lines 471 - 472, The current assertions in TestHoodieVectorSearchFunction
use ex.getMessage.contains(...) || ex.getCause.getMessage.contains(...) which
can throw NPE if getCause() is null; replace these with a null-safe single
assertion by computing a root message string (e.g., String rootMsg =
ex.getMessage() != null ? ex.getMessage() : (ex.getCause() != null ?
ex.getCause().getMessage() : "")) and then
assertTrue(rootMsg.contains("nonexistent_col")); apply the same pattern to the
other occurrences around the ex variable (the assertions at the other noted
locations) so you only assert once against a non-null-safe root message.
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCatalogBackedTableMetadata.scala-37-62 (1)

37-62: ⚠️ Potential issue | 🟡 Minor

Make “metadata disabled” explicit in table properties.

This test is named and documented as metadata-disabled, but DDL currently relies on defaults. Please set 'hoodie.metadata.enable'='false' explicitly to keep intent stable across default changes.

🛠️ Suggested fix
 | tblproperties (
 |  'primaryKey' = 'id',
 |  'type' = 'COW',
-|  'preCombineField'='ts'
+|  'preCombineField'='ts',
+|  'hoodie.metadata.enable'='false'
 | )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCatalogBackedTableMetadata.scala`
around lines 37 - 62, The test TestCatalogBackedTableMetadata currently creates
the Hudi table without explicitly disabling the metadata table; update the
CREATE TABLE DDL inside the spark.sql call (the block that defines targetTable
and tablePath) to add the table property 'hoodie.metadata.enable'='false' to the
tblproperties list so the test intent (metadata disabled) is explicit and stable
against default changes.

Comment on lines +739 to +742
if (option.isPresent() && config.isExpirationOfClusteringEnabled()) {
heartbeatClient.start(instantTime);
log.info("Started heartbeat for clustering instant {}", instantTime);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Start the clustering heartbeat in cluster(...), not only when scheduling.

A clustering instant is often scheduled by one process and executed later by another. With the heartbeat started here, the scheduler owns liveness, but the actual cluster(...) path never restarts it. That lets a live execute-only or schedule-then-execute job look expired and become rollback-eligible while it is still running.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 739 - 742, The heartbeat is only started at scheduling time, so the
execute-only path can appear expired; update BaseHoodieTableServiceClient to
start/restart the clustering heartbeat inside the cluster(...) execution path as
well: in the cluster(...) method call heartbeatClient.start(instantTime) (with
the same config.isExpirationOfClusteringEnabled() guard) at the beginning of
execution using the instantTime argument, and log the start (e.g. "Started
heartbeat for clustering instant {}"); ensure the call is idempotent so it’s
safe to keep/remove the original start in the scheduling branch
(option.isPresent()) as appropriate.

Comment on lines +111 to +125
@Override
public boolean hasNext() {
if (recordIterator != null) {
if (recordIterator.hasNext()) {
return true;
} else {
recordIterator.close();
recordIterator = null;
}
}
if (fileSplitIterator.hasNext()) {
recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
return recordIterator.hasNext();
}
return false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Skip empty split iterators instead of stopping the whole scan.

If recordIteratorFunc.apply(...) returns an empty iterator for one split, hasNext() returns false immediately even when later splits still contain records. That truncates CDC output on the first empty split.

Suggested fix
 `@Override`
 public boolean hasNext() {
   if (recordIterator != null) {
     if (recordIterator.hasNext()) {
       return true;
     } else {
       recordIterator.close();
       recordIterator = null;
     }
   }
-  if (fileSplitIterator.hasNext()) {
-    recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
-    return recordIterator.hasNext();
+  while (fileSplitIterator.hasNext()) {
+    recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
+    if (recordIterator.hasNext()) {
+      return true;
+    }
+    recordIterator.close();
+    recordIterator = null;
   }
   return false;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 111 - 125, In CdcIterators.hasNext(), the current logic returns
false as soon as recordIteratorFunc.apply(...) produces an empty iterator, which
stops the scan prematurely; change the method to loop: when recordIterator is
null and fileSplitIterator.hasNext(), create recordIterator via
recordIteratorFunc.apply(...), and if that iterator is empty immediately close
it, set recordIterator to null, and continue to the next split until you either
find a non-empty recordIterator (return true) or exhaust fileSplitIterator
(return false); ensure you still close previous recordIterator when it becomes
exhausted. Reference: class CdcIterators, method hasNext, fields recordIterator,
fileSplitIterator, and recordIteratorFunc.

Comment on lines +335 to +338
public void close() {
logRecordIterator.close();
imageManager.close();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't close the shared CdcImageManager from DataLogFileIterator.

CdcFileSplitsIterator owns that manager and reuses it across split iterators. Closing it here means the first exhausted log split can invalidate image lookups for all subsequent splits, and it also gets closed a second time by the outer iterator.

Suggested fix
 `@Override`
 public void close() {
   logRecordIterator.close();
-  imageManager.close();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java`
around lines 335 - 338, The DataLogFileIterator's close() method is closing the
shared CdcImageManager (imageManager), which is owned and reused by
CdcFileSplitsIterator; remove the imageManager.close() call from
DataLogFileIterator.close() so only the iterator-specific resource
(logRecordIterator) is closed and let CdcFileSplitsIterator remain responsible
for closing the shared CdcImageManager.

Comment on lines +336 to +350
for (HoodieInstant instant : getPendingClusteringInstantsForPartitions(metaClient, partitions)) {
if (!BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(
metaClient, instant, client.getConfig(), client.getHeartbeatClient())) {
throw new HoodieException("Clustering instant " + instant.requestedTime()
+ " targeting requested partitions is not eligible for rollback "
+ "(heartbeat still active or instant too recent)");
}
// Reload timeline to handle the case where the instant committed and cleaned up
// its heartbeat after the timeline was first loaded
metaClient.reloadActiveTimeline();
if (metaClient.getActiveTimeline().filterInflightsAndRequested()
.containsInstant(instant.requestedTime())) {
LOG.info("Rolling back expired clustering instant {}", instant.requestedTime());
client.rollback(instant.requestedTime());
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Re-check rollback eligibility after reloading the timeline.

There's a TOCTOU window here: another executor can start the clustering heartbeat after the first isClusteringInstantEligibleForRollback(...) call, and the reload only verifies that the instant is still pending. Re-run the eligibility check under the refreshed timeline (or lock the whole decision) before client.rollback(...), otherwise this can cancel a now-live clustering job.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java`
around lines 336 - 350, The current loop in
getPendingClusteringInstantsForPartitions calls
isClusteringInstantEligibleForRollback(metaClient, instant, client.getConfig(),
client.getHeartbeatClient()) before reloading the timeline, which creates a
TOCTOU: after metaClient.reloadActiveTimeline() another executor could have
started the heartbeat; before calling client.rollback(instant.requestedTime())
re-check eligibility against the refreshed timeline (i.e., call
isClusteringInstantEligibleForRollback again using the same parameters) and only
proceed to client.rollback if that second check returns true, or alternatively
hold a lock around the reload+eligibility+rollback sequence to prevent
concurrent heartbeats from making the instant live between checks; update the
logic in HoodieClusteringJob where these symbols appear:
getPendingClusteringInstantsForPartitions,
isClusteringInstantEligibleForRollback, metaClient.reloadActiveTimeline,
metaClient.getActiveTimeline().filterInflightsAndRequested().containsInstant,
and client.rollback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants