Skip to content

[OSS PR #18487] fix: Update hudi-azure-bundle dependencies#30

Open
yihua wants to merge 10 commits into
masterfrom
oss-18487
Open

[OSS PR #18487] fix: Update hudi-azure-bundle dependencies#30
yihua wants to merge 10 commits into
masterfrom
oss-18487

Conversation

@yihua
Copy link
Copy Markdown
Owner

@yihua yihua commented Apr 9, 2026

Mirror of apache#18487 for automated bot review.

Original author: @linliu-code
Base branch: master

Summary by CodeRabbit

Release Notes

  • New Features

    • Added Azure Blob Storage support for distributed lock coordination.
    • Enabled clustering instant expiration with configurable heartbeat-based thresholds.
    • Introduced record limit push-down for Flink source reads.
    • Implemented column pruning optimization for incremental queries.
  • Improvements

    • Enhanced schema validation to prevent unsupported nested Vector field configurations.
    • Improved log compaction scheduling logic with configurable thresholds.
    • Refined resource cleanup for parquet reader operations.
  • Tests

    • Added comprehensive test coverage for clustering expiration, record limiting, and incremental query optimizations.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 9, 2026

📝 Walkthrough

Walkthrough

This pull request introduces multiple substantial features and refactorings: Azure Blob Storage-based distributed locking, clustering expiration/rollback with configurable thresholds, distributed metrics registry system across executors, Flink RLI bootstrap state simplification, record limit push-down in Flink source readers, VECTOR column nesting validation, incremental read column pruning, log compaction enhancements, instant time format normalization, and a new Azure bundle packaging module.

Changes

Cohort / File(s) Summary
Azure Storage Lock Client
hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
New StorageLockClient implementation using Azure Blob Storage as distributed lock backend with ETag-based preconditions for create/update semantics and status/error mapping for Azure SDK exceptions.
Clustering Expiration Feature
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java, hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
Added clustering expiration config properties (ENABLE_EXPIRATIONS, EXPIRATION_THRESHOLD_MINS), updated UPDATES_STRATEGY to infer from conflict-resolution strategy, added rollback eligibility logic with heartbeat expiry checks, and conditional heartbeat start/stop during clustering lifecycle.
Distributed Metrics Registry
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java, hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java, hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java, hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java, hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
Refactored metrics registry system to support distributed registries with table-scoped prefixes, compound keys, lazy initialization of Hadoop/WriteConfig, executor-side registry registration, and conditional initialization based on metrics/executor-metrics enablement.
Log Compaction Enhancements
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java, hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
Added threshold-based log compaction scheduling via needLogCompact(...), introduced timeline utilities getLastLogCompaction(...) and getDeltaCommitsSinceLatestCompletedLogCompaction(...), renamed and refactored delta-commit metrics computation.
Flink RLI Bootstrap Refactoring
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
Simplified RLI bootstrap by removing persisted state tracking, job-ID coordination, and checkpoint reconciliation; added synchronous recommit on checkpoint reset and global failover triggering on bootstrap recommendation.
Record Limiter and Source Reader
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
Introduced RecordLimiter for global record-count limit enforcement across splits, integrated into scan context, and modified source reader/split reader to apply limiting and drain splits when limit is reached.
CDC Iterator Refactoring
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/AbstractSplitReaderFunction.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
Consolidated CDC iterator implementations into reusable CdcIterators class supporting multiple CDC inference modes, refactored image caching via new CdcImageManager, and introduced AbstractSplitReaderFunction base class for shared reader logic.
VECTOR Column Nesting Validation
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java, hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
Added validation to enforce VECTOR columns as top-level record fields only; throws HoodieSchemaException when VECTOR appears nested inside ARRAY, MAP, or record fields; added depth-aware conversion with constraint checking.
Incremental Read Column Pruning
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala
Switched incremental relations from TableScan to PrunedScan, integrated column pruning via IncrementalRelationUtil, deduplicates metadata fields during schema merging, and applies required-column filtering post-scan.
Instant Time Format Normalization
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala, hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
Added formatIncrementalInstant(...) supporting sentinel values and legacy zero-prefixed timestamps, enhanced formatQueryInstant(...) for ISO datetime with T/space, epoch seconds/millis, and normalized parameter handling in datasource options.
Flink Split Assignment
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
Updated bucket assigner to accept Flink configuration and apply bucket-index-driven partition function for task assignment; propagated configuration through split provider instantiation.
Utility and Configuration
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java, hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java, hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java, hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
Removed unused initWrapperFSMetrics() method, removed WRITE_OPERATOR_UID config option, made ParquetReaderIterator.close() idempotent with state tracking, and added getMetricRegistry(...) to engine context base class.
Build and Packaging
packaging/hudi-azure-bundle/pom.xml, pom.xml
Added new Maven shaded bundle module for Azure SDK with relocation rules to avoid classpath collisions and configured fat JAR construction.
Test Coverage
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java, hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java, hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java, hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/..., hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/...
Comprehensive test additions for clustering expiration, distributed registry behavior, record limiter, split reader functions, CDC iterators, instant time validation, incremental query column pruning, and VECTOR validation.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120+ minutes

Hop, hop! A lock now guards the Azure way,
Clustering times shall expire and rollback—hooray!
Registries dance across the executor land,
While vectors nest no more, as bundles expand! 🐰✨

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch oss-18487

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java (1)

63-66: ⚠️ Potential issue | 🟠 Major

closed state is bypassed on exception paths, causing redundant close + wrong exception wrapping

Line 64 and Line 82 call FileIOUtils.closeQuietly(parquetReader) directly, which bypasses close() and leaves iterator state inconsistent with the new closed flag. Also, the HoodieException thrown at Line 75 is re-caught and rewrapped in next(). This is why the test now observes two close() invocations.

💡 Proposed fix
@@
   `@Override`
   public boolean hasNext() {
     try {
@@
       return hasNextRecord;
     } catch (Exception e) {
-      FileIOUtils.closeQuietly(parquetReader);
+      FileIOUtils.closeQuietly(this);
       throw new HoodieException("unable to read next record from parquet file ", e);
     }
   }
@@
   `@Override`
   public T next() {
     try {
@@
       this.next = null;
       return retVal;
+    } catch (HoodieException e) {
+      // Preserve semantic exceptions (like "No more records...") as-is
+      FileIOUtils.closeQuietly(this);
+      throw e;
     } catch (Exception e) {
-      FileIOUtils.closeQuietly(parquetReader);
+      FileIOUtils.closeQuietly(this);
       throw new HoodieException("unable to read next record from parquet file ", e);
     }
   }

Also applies to: 81-84

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java`
around lines 63 - 66, The exception handlers in ParquetReaderIterator are
bypassing the iterator's close() logic by calling
FileIOUtils.closeQuietly(parquetReader) directly and re-wrapping HoodieException
in next(), causing duplicate closes and inconsistent closed state; change those
catch blocks to call this.close() (which sets the closed flag) instead of
FileIOUtils.closeQuietly(parquetReader), and when catching HoodieException
inside next() do not re-wrap it—rethrow it as-is so it isn't caught and wrapped
again by outer handlers; update the catch clauses around parquetReader usage in
next() and close-related exception paths to use this.close() and
preserve/propagate HoodieException.
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java (1)

484-516: ⚠️ Potential issue | 🟠 Major

Centralize VECTOR placement validation across all schema entry points.

This only validates nested RECORD fields on the createRecord(...) path. Invalid shapes such as ARRAY<RECORD<embedding: VECTOR>>, MAP<RECORD<embedding: VECTOR>>, HoodieSchema.parse(...), and new HoodieSchema.Builder(...).build() still bypass the rule, so unsupported nested VECTOR schemas remain constructible outside the Spark converter.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java`
around lines 484 - 516, Centralize the nested VECTOR validation by invoking
validateNoVectorInNestedRecord from every HoodieSchema construction path: call
it from HoodieSchema.parse(...), from HoodieSchema.Builder.build(), and from the
HoodieSchema constructors (or a single private init method used by all
constructors) so any created schema runs the same check; ensure the call uses
nested=false for top-level and that recursive validation already in
validateNoVectorInNestedRecord will catch VECTORs inside RECORD/ARRAY/MAP/other
nested types, and throw HoodieSchemaException when violated.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java (1)

353-364: ⚠️ Potential issue | 🟡 Minor

Reinitialize BucketAssignFunction during simulated subtask failover.

subTaskFails() clears only the wrapper’s MockBucketAssignFunctionContext, but the real bucketAssignerFunction instance and its keyed state remain live. That makes the post-failover bootstrap replay run against stale pre-failover state, so these tests can pass without matching what Flink would actually restore.

Possible fix
 public void subTaskFails(int taskID, int attemptNumber) throws Exception {
   coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
   // reset the attempt number to simulate the task failover/retries
   this.runtimeContext.setAttemptNumber(attemptNumber);
   this.bucketAssignFunctionContext.clear();
+  this.bucketAssignerFunction.close();
+  this.bucketAssignerFunction = new BucketAssignFunction(conf);
+  this.bucketAssignerFunction.setRuntimeContext(runtimeContext);
+  this.bucketAssignerFunction.setCorrespondent(correspondent);
+  this.bucketAssignerFunction.open(conf);
+  this.bucketAssignerFunction.initializeState(this.stateInitializationContext);
   setupWriteFunction();
   if (supportStreamingWriteIndex()) {
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
       setupIndexBootstrapFunction();
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java`
around lines 353 - 364, In subTaskFails(), besides clearing the wrapper's
MockBucketAssignFunctionContext, reinstantiate and reinitialize the actual
bucketAssignerFunction and its keyed state so post-failover behavior matches
Flink restore; specifically, after setting the new attempt number and calling
bucketAssignFunctionContext.clear(), create a fresh BucketAssignFunction
instance (or call its open/init method) and clear/reset any keyed state or state
handles associated with bucketAssignerFunction before calling
setupWriteFunction() (and before
setupIndexBootstrapFunction()/setupIndexWriteFunction() when present) so
bootstrap replay uses a clean, post-failover assigner state.
🟡 Minor comments (8)
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala-97-124 (1)

97-124: ⚠️ Potential issue | 🟡 Minor

Use completion time for the v9 incremental boundary.

Both tests build START_COMMIT from requestedTime, but the v9 incremental path is completion-time based. That means these cases can still pass without actually validating the intended table-version-specific window semantics.

⏱️ Suggested fix
-    val commit1 = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants().lastInstant().get().requestedTime
+    val lastInstant = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+    val commit1 =
+      if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+        lastInstant.getCompletionTime
+      } else {
+        lastInstant.requestedTime
+      }

Also applies to: 207-229

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala`
around lines 97 - 124, The test currently builds START_COMMIT using the
instant's requestedTime (variables commit1 and commit2) but v9 incremental
queries expect the completion time; update the test to use get().getTimestamp or
get().getCommitTime/getCompletionTime (the instant completion timestamp) when
setting DataSourceReadOptions.START_COMMIT for the incremental query calls that
reference commit1/commit2 so the incremental window matches v9 semantics; locate
and change the construction of commit1 and commit2 (and the second occurrence at
lines ~207-229) to extract the instant completion time rather than requestedTime
and pass those strings into the .option(DataSourceReadOptions.START_COMMIT.key,
...) calls.
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala-120-168 (1)

120-168: ⚠️ Potential issue | 🟡 Minor

These assertions validate projection, not scan pruning.

.select("col1", "col3") already guarantees the output schema only contains those columns, so the current checks would still pass even if buildScan(requiredColumns) read the full row. The plan assertion is also too weak because the selected column names will appear in the plan regardless. Please assert the scan/read schema itself, not just the post-projection schema.

Also applies to: 225-240

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala`
around lines 120 - 168, The test currently verifies only projection
(post-select) rather than actual scan pruning; update the assertions to inspect
the scan/read schema from the query plan instead of the resulting DataFrame
schema: locate the prunedDF usage and replace/augment the checks that inspect
prunedDF.schema and plan string with assertions that examine
prunedDF.queryExecution.executedPlan (or prunedDF.queryExecution.optimizedPlan)
to find the physical/table-scan node’s ReadSchema/requiredSchema and assert it
contains only "col1" and "col3" (and does not contain "col2","col4", etc.);
apply the same change for the similar block referenced at lines 225-240 so you
assert the scan-level schema (buildScan(requiredColumns) / read-schema) rather
than just the projection-level schema.
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala-2331-2348 (1)

2331-2348: ⚠️ Potential issue | 🟡 Minor

Assert the duplicated _hoodie_partition_path values too.

This test currently proves the incremental read no longer throws, but it would still pass if the dedupe logic kept Hudi's meta field and dropped the user column silently. Please assert that _hoodie_partition_path still carries the source values (src_partition*), not the table partition values (partition*).

🧪 Suggested assertion tightening
-    val results = incrementalDf.select("_row_key", "data")
+    val results = incrementalDf.select("_row_key", "_hoodie_partition_path", "data")
       .orderBy("_row_key")
       .collect()

     // row1 was updated with new value
     assertEquals("row1", results(0).getAs[String]("_row_key"))
+    assertEquals("src_partition1", results(0).getAs[String]("_hoodie_partition_path"))
     assertEquals("value1_updated", results(0).getAs[String]("data"))

     assertEquals("row2", results(1).getAs[String]("_row_key"))
+    assertEquals("src_partition1", results(1).getAs[String]("_hoodie_partition_path"))
     assertEquals("value2", results(1).getAs[String]("data"))

     assertEquals("row3", results(2).getAs[String]("_row_key"))
+    assertEquals("src_partition2", results(2).getAs[String]("_hoodie_partition_path"))
     assertEquals("value3", results(2).getAs[String]("data"))

     // row4 is new from the upsert
     assertEquals("row4", results(3).getAs[String]("_row_key"))
+    assertEquals("src_partition2", results(3).getAs[String]("_hoodie_partition_path"))
     assertEquals("value4", results(3).getAs[String]("data"))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala`
around lines 2331 - 2348, The test in TestMORDataSource.scala currently
validates row keys and "data" but omits asserting the duplicated meta column
`_hoodie_partition_path`; add assertions after the existing checks to verify
that the `_hoodie_partition_path` values in the incrementalDf results correspond
to the original source partition values (e.g., "src_partition1",
"src_partition2", "src_partition3", "src_partition4") rather than the table
partition values; locate the block using the local variable incrementalDf and
the collected results array `results` and add assertEquals checks against
results(i).getAs[String]("_hoodie_partition_path") for each row to enforce the
dedupe preserved source partition metadata.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java-53-69 (1)

53-69: ⚠️ Potential issue | 🟡 Minor

Honor NO_LIMIT inside the wrapper.

isLimitReached() treats NO_LIMIT as unlimited, but wrap() bypasses that contract and checks totalReadCount >= limit directly. A direct new RecordLimiter(NO_LIMIT).wrap(...) will therefore suppress every record on the first call. Please short-circuit the unlimited case here (and ideally reject other negative values in the constructor).

Suggested fix
   public <T> RecordsWithSplitIds<T> wrap(RecordsWithSplitIds<T> records) {
+    if (limit == NO_LIMIT) {
+      return records;
+    }
     return new RecordsWithSplitIds<T>() {
       `@Override`
       public String nextSplit() {
         return records.nextSplit();
       }

       `@Override`
       public T nextRecordFromSplit() {
-        if (totalReadCount.get() >= limit) {
+        if (isLimitReached()) {
           return null;
         }
         T record = records.nextRecordFromSplit();
         if (record != null) {
           totalReadCount.incrementAndGet();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java`
around lines 53 - 69, The wrapper in RecordLimiter.wrap ignores the NO_LIMIT
sentinel and directly compares totalReadCount to limit, causing all records to
be suppressed when limit == NO_LIMIT; update wrap(RecordsWithSplitIds<T>) to
short-circuit and simply return the original records iterator when limit ==
NO_LIMIT (or when isLimitReached() would never be true), and update the
RecordLimiter constructor to validate and reject negative limits except for the
defined NO_LIMIT constant so invalid negatives cannot be used; reference the
RecordLimiter.wrap method, the limit field, NO_LIMIT constant, isLimitReached(),
and the RecordLimiter constructor when making these changes.
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java-281-289 (1)

281-289: ⚠️ Potential issue | 🟡 Minor

Unchecked cast to DistributedRegistry.

Line 286 casts the registry to DistributedRegistry without verification. If Registry.getRegistryOfClass returns a different implementation, this will throw a ClassCastException.

🛡️ Proposed fix to add type check
   public Registry getMetricRegistry(String tableName, String registryName) {
     final String prefixedName = tableName.isEmpty() ? registryName : tableName + "." + registryName;
     return DISTRIBUTED_REGISTRY_MAP.computeIfAbsent(prefixedName, key -> {
       Registry registry = Registry.getRegistryOfClass(tableName, registryName, DistributedRegistry.class.getName());
+      if (!(registry instanceof DistributedRegistry)) {
+        throw new IllegalStateException("Expected DistributedRegistry but got " + registry.getClass().getName());
+      }
       ((DistributedRegistry) registry).register(javaSparkContext);
       return registry;
     });
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`
around lines 281 - 289, getMetricRegistry currently casts the result of
Registry.getRegistryOfClass to DistributedRegistry without checking its type,
risking a ClassCastException; update getMetricRegistry to call
Registry.getRegistryOfClass(tableName, registryName,
DistributedRegistry.class.getName()), then verify the returned Registry is an
instance of DistributedRegistry before casting (e.g., use instanceof), only call
register(javaSparkContext) when it is a DistributedRegistry, and handle the
else-case by either logging/throwing a clear error or returning the registry
unchanged; references: getMetricRegistry, DISTRIBUTED_REGISTRY_MAP,
Registry.getRegistryOfClass, DistributedRegistry, javaSparkContext.
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java-55-61 (1)

55-61: ⚠️ Potential issue | 🟡 Minor

Clear the global registry cache in teardown.

Registry.REGISTRY_MAP is JVM-global, so stopping Spark alone does not isolate this class from later tests. Leaving entries behind makes the suite order-dependent if another test reuses a registry name.

Suggested fix
  `@AfterAll`
  public static void tearDown() {
    if (jsc != null) {
      jsc.stop();
      jsc = null;
    }
+   Registry.REGISTRY_MAP.clear();
  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java`
around lines 55 - 61, The test leaves entries in the JVM-global
Registry.REGISTRY_MAP which can make later tests order-dependent; update the
TestDistributedRegistry.tearDown method to clear the global registry cache after
stopping Spark by invoking Registry.REGISTRY_MAP.clear() (or the
Registry-provided reset method if available) so that entries created by this
test are removed and subsequent tests run in isolation; ensure this clear call
runs even when jsc is null or after jsc.stop() to guarantee cleanup.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java-398-403 (1)

398-403: ⚠️ Potential issue | 🟡 Minor

isAlreadyBootstrap() no longer verifies bootstrap completion for RLI.

For RLIBootstrapOperator, this now returns true as soon as the operator object exists. Any test calling assertBootstrapped() on the streaming-index path becomes vacuous, because it no longer checks that any bootstrap work actually completed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java`
around lines 398 - 403, isAlreadyBootstrap() currently treats any non-null
RLIBootstrapOperator as "bootstrapped" instead of verifying completion; update
the method so it detects RLIBootstrapOperator specifically and calls its real
completion-check API (e.g., RLIBootstrapOperator.isBootstrapCompleted() or the
actual method that reports bootstrap completion) rather than returning
bootstrapOperator != null, keeping the existing behavior for BootstrapOperator
by still calling ((BootstrapOperator) bootstrapOperator).isAlreadyBootstrap().
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java-301-317 (1)

301-317: ⚠️ Potential issue | 🟡 Minor

Restart the pipeline after the forced global failure.

After Line 303, the test only simulates another subtask retry on the same wrapper. It never exercises the real path after context.failJob(...), where the coordinator/operators are rebuilt from checkpoint and RLI bootstrap runs again. As written, this can pass even if the post-failover recovery path is broken.

Suggested adjustment
     testHarness.assertNextEvent();
     // global failover is triggered now, since all the pending write metadata events are recommitted.
     testHarness.assertGlobalFailure(true);

-    testHarness.subTaskFails(0, 0)
+    testHarness.jobFailover()
+        .assertGlobalFailure(false)
         .checkIndexLoaded(
             new HoodieKey("id1", "par1"),
             new HoodieKey("id3", "par2"))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java`
around lines 301 - 317, The test currently asserts a global failure but then
only retries a subtask on the same harness wrapper; change it to simulate a real
job restart so the coordinator/operators are rebuilt and RLI bootstrap runs:
after testHarness.assertGlobalFailure(true) perform a full harness restart
(e.g., call the harness restart API or tear down and re-instantiate the
TestHarness with the same checkpointed state) instead of just continuing on the
same wrapper, then proceed with the subsequent
subTaskFails/consume/checkpoint/assertNextEvent/checkpointComplete/checkWrittenData
sequence to validate post-failover recovery (referencing testHarness,
assertGlobalFailure, subTaskFails, checkIndexLoaded, consume,
checkpointComplete, and checkWrittenData).
🧹 Nitpick comments (10)
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java (1)

565-596: Make this provider test exercise the new config-aware path.

Right now it builds the assigner with new Configuration() and still asserts the old-looking bucket -> task mapping. With this setup, the partition term can cancel out, so the test may keep passing even if the new routing logic regresses. Please pin BUCKET_INDEX_NUM_BUCKETS explicitly and use a partition/setup where the expected task ID is not just bucket % parallelism.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java`
around lines 565 - 596, The test testGetNextWithBucketAssigner should exercise
the config-aware routing path: construct the HoodieSplitBucketAssigner with a
Configuration that explicitly sets BUCKET_INDEX_NUM_BUCKETS to the desired value
(via the config key constant used by HoodieSplitBucketAssigner) and choose
fileId/partition values in createSplitWithBucketFileId such that expected task
assignment is not simply bucket % parallelism; then update the expected task ids
in the provider.getNext assertions to reflect the config-aware routing.
Specifically, modify the test to set the config key on the Configuration passed
into new HoodieSplitBucketAssigner(...), keep using
DefaultHoodieSplitProvider(provider) and onDiscoveredSplits(splits), but change
the split creation/partitioning so expected task IDs differ from bucket %
parallelism and assert those new expected values from provider.getNext(…).
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala (1)

2605-2617: Strengthen the regression test with an explicit “no duplicate columns” assertion.

This test already checks read success and data correctness; adding a schema uniqueness check will directly pin the duplicate-column failure mode.

Proposed test hardening
     val incrementalDf = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.START_COMMIT.key, "000")
       .load(basePath)

+    val fieldNames = incrementalDf.schema.fieldNames
+    assertEquals(fieldNames.length, fieldNames.distinct.length,
+      "Incremental schema should not contain duplicate column names")
+
     // Verify the data was read correctly
     assertEquals(3, incrementalDf.count())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala`
around lines 2605 - 2617, The test should assert the DataFrame schema has no
duplicate column names to catch regressions; after building incrementalDf (the
DataFrame selected into 'incrementalDf' in TestCOWDataSource.scala), collect its
schema field names (incrementalDf.schema.fieldNames) and assert the array length
equals the size of its unique set (or that frequency of any name is 1), and
optionally assert there aren’t multiple "data" or "_row_key" entries—this will
fail the test if duplicate columns are present.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java (2)

225-242: Test assertion is incomplete.

The test calls function.read(cdcSplit) but doesn't assert anything about the result or capture exceptions. The comment says "Should not throw exception" but there's no verification. Consider adding an explicit assertion or using assertDoesNotThrow().

♻️ Suggested improvement
   `@Test`
   public void testReadAcceptsCdcSourceSplitType() {
     // Verify that HoodieCdcSourceSplit is accepted (cast doesn't throw).
     // Actual I/O would require a real Hoodie table, so we only check the
     // type-guard passes by catching the downstream I/O error rather than
     // an IllegalArgumentException.
     HoodieCdcSplitReaderFunction function = createFunction();
 
     HoodieCDCFileSplit[] changes = {
         new HoodieCDCFileSplit("20230101000000000", HoodieCDCInferenceCase.BASE_FILE_INSERT, "insert.parquet")
     };
     HoodieCdcSourceSplit cdcSplit = new HoodieCdcSourceSplit(
         1, tempDir.getAbsolutePath(), 128 * 1024 * 1024L, "file-cdc",
         EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
 
-    // Should not throw exception
-    function.read(cdcSplit);
+    // Should not throw IllegalArgumentException (type-guard should pass)
+    assertDoesNotThrow(() -> function.read(cdcSplit));
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java`
around lines 225 - 242, The test
TestHoodieCdcSplitReaderFunction.testReadAcceptsCdcSourceSplitType currently
just calls HoodieCdcSplitReaderFunction.read(cdcSplit) without verifying
behavior; update the test to explicitly assert no exception is thrown (e.g.,
wrap the call in an assertDoesNotThrow or a try/catch with fail on exception) or
otherwise verify expected outcome from read, referencing the
HoodieCdcSplitReaderFunction.read invocation and the cdcSplit setup so the test
fails if read throws.

168-219: Test names are misleading - no limit parameter is being tested.

The section is titled "Limit push-down constructor tests" but HoodieCdcSplitReaderFunction constructor doesn't accept a limit parameter. All tests use the same 6-argument constructor and only assert assertNotNull(function).

For example, testDefaultConstructorUsesNoLimitSentinel claims to verify that a 6-arg constructor delegates to a 7-arg with limit=-1, but both function instances are constructed identically and no assertion verifies the sentinel value.

Either rename these tests to reflect what they actually verify (basic construction with different parameters), or update them to test actual limit behavior if HoodieCdcSplitReaderFunction is expected to handle limits.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java`
around lines 168 - 219, The test methods under "Limit push-down constructor
tests" are misleading because they all call the same 6-arg
HoodieCdcSplitReaderFunction constructor and never exercise or assert any limit
behavior; update the tests to either (A) actually test limit behavior by
invoking the 7-arg constructor that accepts a limit (if one exists) and assert
the internal sentinel/limit field (e.g., construct with limit=-1, limit=0,
positive limit and validate the function’s limit member or behavior), or (B) if
no limit parameter exists, rename the test methods (testConstructorWithLimit*,
testDefaultConstructorUsesNoLimitSentinel,
testConstructorWithLimitZeroIsAccepted) to reflect they only verify basic
construction and adjust assertions to check relevant state/arguments (notNull
plus any differing inputs like empty field types), and update the test names
referenced (testConstructorWithLimit,
testConstructorWithLimitAndEmptyFieldTypes,
testDefaultConstructorUsesNoLimitSentinel,
testConstructorWithLimitZeroIsAccepted) accordingly so names match actual
assertions.
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java (1)

388-445: Remove duplicate test methods.

The newly added tests are duplicates of existing tests in this file:

  • testConstructorWithEmitDelete duplicates testConstructorWithEmitDeleteFalse (lines 340-353)
  • testConstructorWithNullTableSchemaThrows duplicates testConstructorValidatesTableSchema (lines 72-86)
  • testConstructorWithNullRequiredSchemaThrows duplicates testConstructorValidatesRequiredSchema (lines 88-102)
  • testDefaultConstructor duplicates testConstructorWithValidParameters (lines 104-119)

The comment on lines 389-391 mentions "limit is now enforced in HoodieSourceSplitReader" but these tests don't actually verify limit behavior—they just repeat constructor validation.

♻️ Suggested fix: Remove duplicates
-  // -------------------------------------------------------------------------
-  //  Constructor tests (limit is now enforced in HoodieSourceSplitReader)
-  // -------------------------------------------------------------------------
-
-  `@Test`
-  public void testConstructorWithEmitDelete() {
-    HoodieSplitReaderFunction function =
-        new HoodieSplitReaderFunction(
-            conf,
-            tableSchema,
-            requiredSchema,
-            mockInternalSchemaManager,
-            "AVRO_PAYLOAD",
-            Collections.emptyList(),
-            false
-        );
-
-    assertNotNull(function);
-  }
-
-  `@Test`
-  public void testConstructorWithNullTableSchemaThrows() {
-    assertThrows(IllegalArgumentException.class, () ->
-        new HoodieSplitReaderFunction(
-            conf,
-            null,
-            requiredSchema,
-            mockInternalSchemaManager,
-            "AVRO_PAYLOAD",
-            Collections.emptyList(),
-            false
-        ));
-  }
-
-  `@Test`
-  public void testConstructorWithNullRequiredSchemaThrows() {
-    assertThrows(IllegalArgumentException.class, () ->
-        new HoodieSplitReaderFunction(
-            conf,
-            tableSchema,
-            null,
-            mockInternalSchemaManager,
-            "AVRO_PAYLOAD",
-            Collections.emptyList(),
-            false
-        ));
-  }
-
-  `@Test`
-  public void testDefaultConstructor() {
-    HoodieSplitReaderFunction function =
-        new HoodieSplitReaderFunction(
-            conf, tableSchema, requiredSchema, mockInternalSchemaManager,
-            "AVRO_PAYLOAD", Collections.emptyList(), false);
-
-    assertNotNull(function);
-  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java`
around lines 388 - 445, Remove the duplicated unit tests that repeat existing
constructor validation checks: delete the test methods
testConstructorWithEmitDelete, testConstructorWithNullTableSchemaThrows,
testConstructorWithNullRequiredSchemaThrows, and testDefaultConstructor from
TestHoodieSplitReaderFunction; these are redundant with
testConstructorWithEmitDeleteFalse, testConstructorValidatesTableSchema,
testConstructorValidatesRequiredSchema, and testConstructorWithValidParameters
respectively. Ensure no other tests or imports are changed and that the class
still contains the original validation tests for HoodieSplitReaderFunction
constructor behavior.
hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java (1)

208-211: Unused variable clusteringInstantTime.

The variable clusteringInstantTime is assigned on line 210 but never used in this test method. The same issue exists on line 247 in testClusteringExpirationSkipsInstantWithActiveHeartbeat.

🧹 Proposed fix to remove unused variable
     Option<HoodieInstant> pendingCluster = metaClient.getActiveTimeline().getFirstPendingClusterInstant();
     assertTrue(pendingCluster.isPresent());
-    String clusteringInstantTime = pendingCluster.get().requestedTime();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java`
around lines 208 - 211, The variable clusteringInstantTime is assigned but never
used in TestHoodieClusteringJob (and again in
testClusteringExpirationSkipsInstantWithActiveHeartbeat); remove the unused
assignment of clusteringInstantTime (the String clusteringInstantTime =
pendingCluster.get().requestedTime(); statement) or use the value if intended,
leaving the existing retrieval of pendingCluster via
metaClient.getActiveTimeline().getFirstPendingClusterInstant() intact; update
both occurrences to eliminate the unused local variable.
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java (1)

332-352: Throwing on first ineligible instant prevents rollback of other eligible instants.

The method throws a HoodieException immediately when encountering an ineligible instant (line 339-341). If multiple pending clustering instants target the requested partitions and only one has an active heartbeat, this prevents rollback of all other eligible instants.

Consider whether the desired behavior is to:

  1. Throw on the first ineligible instant (current behavior) — stricter, prevents partial rollbacks
  2. Skip ineligible instants and continue with others — more permissive

If the current behavior is intentional, a clarifying comment would be helpful.

Alternative: Skip ineligible and continue
   public static void rollbackFailedClusteringForPartitions(
       SparkRDDWriteClient<?> client,
       HoodieTableMetaClient metaClient,
       List<String> partitions) {
     for (HoodieInstant instant : getPendingClusteringInstantsForPartitions(metaClient, partitions)) {
       if (!BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(
           metaClient, instant, client.getConfig(), client.getHeartbeatClient())) {
-        throw new HoodieException("Clustering instant " + instant.requestedTime()
-            + " targeting requested partitions is not eligible for rollback "
-            + "(heartbeat still active or instant too recent)");
+        LOG.warn("Skipping clustering instant {} - not eligible for rollback "
+            + "(heartbeat still active or instant too recent)", instant.requestedTime());
+        continue;
       }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java`
around lines 332 - 352, The current behavior in
rollbackFailedClusteringForPartitions throws a HoodieException when
BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(...) returns
false, which prevents processing subsequent instants; change this to skip
ineligible instants by logging a warning (include instant.requestedTime()) and
continue the loop so other eligible instants can be rolled back, leaving the
existing metaClient.reloadActiveTimeline() and subsequent containsInstant(...) +
client.rollback(...) logic intact; update or add a brief comment near
rollbackFailedClusteringForPartitions explaining that ineligible instants are
intentionally skipped to allow partial rollbacks.
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java (2)

92-97: Static registry map is shared across all context instances.

DISTRIBUTED_REGISTRY_MAP is a static field shared across all HoodieSparkEngineContext instances in the JVM. This means registries for different tables will coexist in the same map. The keying scheme (tableName + "." + registryName) should prevent conflicts, but this design couples all contexts together.

This is likely acceptable for the intended use case but worth noting for future maintainers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`
around lines 92 - 97, DISTRIBUTED_REGISTRY_MAP is declared static so registries
from all HoodieSparkEngineContext instances share the same JVM-wide map; to
avoid cross-context coupling convert DISTRIBUTED_REGISTRY_MAP to a non-static
instance field on HoodieSparkEngineContext (and update any uses in
getMetricRegistry() and serialization-related code to reference the instance
field), or if global sharing is intentional make the intent explicit by adding a
clear javadoc and renaming the field to indicate global scope (e.g.,
GLOBAL_DISTRIBUTED_REGISTRY_MAP) and adjust getMetricRegistry() usages
accordingly so maintainers understand the tradeoff.

189-192: foreach method does not propagate registries to executors.

Unlike map, flatMap, mapToPair, and other methods, foreach does not call setRegistries(registries) inside the closure. This means metrics won't be collected from operations executed via foreach.

If this is intentional (e.g., foreach is only used for side effects where metrics aren't needed), consider adding a comment. Otherwise, apply the same pattern:

♻️ Proposed fix
   `@Override`
   public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism) {
-    javaSparkContext.parallelize(data, parallelism).foreach(consumer::accept);
+    final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
+    javaSparkContext.parallelize(data, parallelism).foreach(i -> {
+      setRegistries(registries);
+      consumer.accept(i);
+    });
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`
around lines 189 - 192, HoodieSparkEngineContext.foreach currently parallels
data and calls consumer::accept without propagating registries to executors, so
metrics/registries aren't set inside the closure; update the foreach
implementation to setRegistries(registries) inside the closure before invoking
consumer.accept (same pattern used in map/flatMap/mapToPair etc.), ensuring the
closure captures the local registries variable and calls
setRegistries(registries) on each executor prior to executing the consumer; if
omission was intentional, add a clear comment in the foreach method explaining
why registries propagation is skipped.
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java (1)

63-74: Add one non-empty tableName case here.

Lines 66-70 only exercise the empty-table branch. The new registry keying is table-aware, so a single assertion through getMetricRegistry("tbl", registryName) would catch regressions in tableName::registryName handling.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java`
around lines 63 - 74, Add a second case in testRegisterAndName that exercises a
non-empty tableName: call engineContext.getMetricRegistry("tbl", registryName)
(same registryName as before), assert the returned object is an instanceOf
DistributedRegistry, assertEquals(registryName, registry.getName()), and
assertTrue(registry.getAllCounts().isEmpty()); this will validate the
tableName::registryName keying path in getMetricRegistry and catch regressions.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9a401476-cdb2-4a67-bda5-b8829effc2dd

📥 Commits

Reviewing files that changed from the base of the PR and between 35e2bbf and 0a4747a.

📒 Files selected for processing (77)
  • hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.java
  • hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
  • hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
  • hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
  • hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
  • hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
  • hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
  • hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.java
  • hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
  • hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
  • hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
  • hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceReader.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/AbstractSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateSnapshotContext.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestRecordLimiter.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestAbstractSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
  • hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
  • hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HoodieWrapperFileSystem.java
  • hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
  • hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
  • hudi-io/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
  • hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
  • hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalQueryColumnPruning.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
  • hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestInstantTimeValidation.scala
  • hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
  • hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
  • packaging/hudi-azure-bundle/pom.xml
  • pom.xml
💤 Files with no reviewable changes (5)
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
  • hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
  • hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
  • hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java

Comment on lines +248 to +256
URI uri = URI.create(lockFileUri);
String scheme = uri.getScheme();

// Always use the Blob endpoint (blob.core.windows.net) since BlobServiceClient uses the
// Blob Storage REST API. The DFS endpoint (dfs.core.windows.net) uses a different API
// (Data Lake Storage Gen2) with different required headers, causing MissingRequiredHeader
// errors on Put Blob operations.
String endpoint = String.format("https://%s.blob.core.windows.net", uriComponents.accountName);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't force every URI onto blob.core.windows.net.

This drops the original host suffix. abfs[s] URIs in Azure Gov/China/private DNS will be rewritten to the public cloud endpoint and fail. Derive the blob endpoint from uri.getHost() and only swap .dfs. to .blob. when needed.

Suggested fix
       URI uri = URI.create(lockFileUri);
-      String scheme = uri.getScheme();
+      String host = uri.getHost();
+      ValidationUtils.checkArgument(host != null, "Missing host in lock file URI: " + lockFileUri);

@@
-      String endpoint = String.format("https://%s.blob.core.windows.net", uriComponents.accountName);
+      String endpointHost = host.replaceFirst("\\.dfs\\.", ".blob.");
+      String endpoint = String.format("https://%s", endpointHost);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
URI uri = URI.create(lockFileUri);
String scheme = uri.getScheme();
// Always use the Blob endpoint (blob.core.windows.net) since BlobServiceClient uses the
// Blob Storage REST API. The DFS endpoint (dfs.core.windows.net) uses a different API
// (Data Lake Storage Gen2) with different required headers, causing MissingRequiredHeader
// errors on Put Blob operations.
String endpoint = String.format("https://%s.blob.core.windows.net", uriComponents.accountName);
URI uri = URI.create(lockFileUri);
String host = uri.getHost();
ValidationUtils.checkArgument(host != null, "Missing host in lock file URI: " + lockFileUri);
// Always use the Blob endpoint (blob.core.windows.net) since BlobServiceClient uses the
// Blob Storage REST API. The DFS endpoint (dfs.core.windows.net) uses a different API
// (Data Lake Storage Gen2) with different required headers, causing MissingRequiredHeader
// errors on Put Blob operations.
String endpointHost = host.replaceFirst("\\.dfs\\.", ".blob.");
String endpoint = String.format("https://%s", endpointHost);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java`
around lines 248 - 256, In AzureStorageLockClient, don't hardcode
blob.core.windows.net when building endpoint; instead derive the host from the
parsed URI (uri.getHost()) and only replace a ".dfs." segment with ".blob." when
present, then build the endpoint as "https://" + host (keep existing
uriComponents.accountName logic out); update the code that sets the endpoint
variable so abfs/abfss URIs for Gov/China/private clouds retain their original
DNS suffix while still handling Data Lake Gen2 dfs→blob conversion.

Comment on lines +261 to +264
long validityTimeoutSecs = ((Number) props.getOrDefault(
StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue()
)).longValue();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Parse the timeout from Properties instead of casting.

When this setting comes from config it will be a String, so the Number cast throws ClassCastException during client construction.

Suggested fix
-      long validityTimeoutSecs = ((Number) props.getOrDefault(
-          StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
-          StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue()
-      )).longValue();
+      long validityTimeoutSecs = Long.parseLong(props.getProperty(
+          StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
+          String.valueOf(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue())
+      ));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
long validityTimeoutSecs = ((Number) props.getOrDefault(
StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue()
)).longValue();
long validityTimeoutSecs = Long.parseLong(props.getProperty(
StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
String.valueOf(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue())
));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java`
around lines 261 - 264, The current construction in AzureStorageLockClient reads
VALIDITY_TIMEOUT_SECONDS by casting props.get(...) to Number which fails when
the property is a String; instead parse the value safely: retrieve the raw
Object from props (props.get or props.getProperty), convert to String if
non-null, then use Long.parseLong with a fallback to
StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue() when blank or
parse fails; update the code that sets validityTimeoutSecs (reference
StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS and the props map) to perform
this parsing and handle Number instances, String instances, and nulls robustly.

Comment on lines +301 to +310
AzureStorageUriComponents uriComponents = AzureStorageUtils.parseAzureUri(filePath);

// Validate that the container name matches the lock client's container
ValidationUtils.checkArgument(
uriComponents.containerName.equals(this.containerName),
String.format("Container name mismatch: expected '%s' but got '%s' in path '%s'",
this.containerName, uriComponents.containerName, filePath));

BlobClient client = blobClient.getContainerClient()
.getBlobClient(uriComponents.blobPath);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate the storage account before reusing this client.

readObject rebuilds a blob client from the current lock client's account and only checks containerName. A URI from another account with the same container will silently read the wrong object. The same guard should be mirrored in writeObject.

Suggested fix
       ValidationUtils.checkArgument(
-          uriComponents.containerName.equals(this.containerName),
-          String.format("Container name mismatch: expected '%s' but got '%s' in path '%s'",
-              this.containerName, uriComponents.containerName, filePath));
+          uriComponents.accountName.equals(blobClient.getAccountName())
+              && uriComponents.containerName.equals(this.containerName),
+          String.format("Storage location mismatch: expected account '%s' / container '%s' but got account '%s' / container '%s' in path '%s'",
+              blobClient.getAccountName(), this.containerName,
+              uriComponents.accountName, uriComponents.containerName, filePath));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java`
around lines 301 - 310, The readObject (and similarly writeObject) path parses
the given filePath with AzureStorageUtils.parseAzureUri into
AzureStorageUriComponents but only validates containerName via
ValidationUtils.checkArgument, allowing URIs from other storage accounts to be
used; update the validation to also verify the storage account/endpoint before
reusing this client's BlobClient (e.g., compare
AzureStorageUriComponents.accountName or endpoint against this client's account
identifier or blobClient.getAccountName()/getUrl()), and if they differ, fail
with a clear ValidationUtils.checkArgument message stating expected vs actual
account so the method (readObject/writeObject) cannot silently read from a
different account.

Comment on lines +635 to +637
if (config.isExpirationOfClusteringEnabled()) {
heartbeatClient.stop(clusteringCommitTime);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stop the clustering heartbeat on failure paths too.

Lines 739-742 start a heartbeat when clustering is scheduled, but Lines 635-637 only stop it after a successful commit. If cluster(...) or commitClustering(...) throws, a long-lived writer can keep renewing that heartbeat indefinitely, and the pending clustering instant never becomes rollback-eligible.

Suggested direction
try {
  // existing clustering execution / completion flow
} catch (Exception e) {
  if (config.isExpirationOfClusteringEnabled()) {
    heartbeatClient.stop(clusteringInstant);
  }
  throw e;
}

Also applies to: 739-742

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 635 - 637, Wrap the clustering execution and commit flow (the calls
to cluster(...) and commitClustering(...)) in a try/catch/finally so that if an
exception occurs you call heartbeatClient.stop(...) when
config.isExpirationOfClusteringEnabled() is true; specifically, when you start
the heartbeat for the scheduled clustering instant (the same instant started at
the earlier start call), ensure any failure path (catch block) invokes
heartbeatClient.stop(clusteringCommitTime or clusteringInstant) before
rethrowing, and consider a finally to guarantee stop on all exit paths to avoid
long-lived heartbeats.

Comment on lines +243 to +256
private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) {
Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction();
int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft();
int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent()
? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
: 0;

int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction);
boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold();
if (shouldLogCompact) {
log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince);
}
return shouldLogCompact;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for tests covering log compaction scheduling
rg -n -A 20 "needLogCompact|LogCompaction" --type java -g '*Test*.java'

Repository: yihua/hudi

Length of output: 50367


🏁 Script executed:

# Search for where needLogCompact is called
rg -n "needLogCompact" --type java

# Search for getLatestDeltaCommitInfoSinceLogCompaction implementation
rg -n "getLatestDeltaCommitInfoSinceLogCompaction" --type java -A 10 | head -50

Repository: yihua/hudi

Length of output: 4665


🏁 Script executed:

# Get more context around where needLogCompact is called
rg -n "needLogCompact" --type java -B 10 -A 5

# Also look at the calling context to understand the logic better
rg -n "return needLogCompact" --type java -B 15 -A 2

Repository: yihua/hudi

Length of output: 8887


🏁 Script executed:

# Look for tests that specifically test needLogCompact or the first log compaction scenario
rg -n "testLogBlocksCountsAfterLogCompaction|stressTestCompactionAndLogCompactionOperations" --type java -A 50 | head -150

# Also search for any other references to understand the intended behavior
rg -n "scheduleLogCompaction\|LOG_COMPACT" --type java hudi-client/hudi-client-common/src/test/ | head -30

Repository: yihua/hudi

Length of output: 17187


🏁 Script executed:

# Let's understand the relationship between latestDeltaCommitInfoSinceCompact 
# and what it represents by looking at getLatestDeltaCommitInfoSinceCompaction
rg -n "getLatestDeltaCommitInfoSinceCompaction\(\)" --type java -B 2 -A 15 | head -60

Repository: yihua/hudi

Length of output: 6347


🏁 Script executed:

# Let's check the actual test file to see if it covers the scenario with no prior log compaction
cat -n hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java | sed -n '355,420p'

Repository: yihua/hudi

Length of output: 6179


🏁 Script executed:

# Let's see what getDeltaCommitsSinceLatestCompletedLogCompaction returns when there's NO log compaction
cat -n hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java | sed -n '397,430p'

Repository: yihua/hudi

Length of output: 3117


🏁 Script executed:

# Find the implementation of getDeltaCommitsSinceLatestCompletedLogCompaction
rg -n "getDeltaCommitsSinceLatestCompletedLogCompaction" --type java hudi-hadoop-common/src/main/java -A 30 | head -100

Repository: yihua/hudi

Length of output: 36


🏁 Script executed:

# Search across all Java files for getDeltaCommitsSinceLatestCompletedLogCompaction
rg -n "getDeltaCommitsSinceLatestCompletedLogCompaction" --type java -A 30 | head -100

Repository: yihua/hudi

Length of output: 15793


🏁 Script executed:

# Find CompactionUtils.java implementation file
find . -name "CompactionUtils.java" -type f | grep -v test | head -5

# Then look at the method implementation
rg -n "public static.*getDeltaCommitsSinceLatestCompletedLogCompaction" --type java -A 40 | head -100

Repository: yihua/hudi

Length of output: 5463


Fix logic bug: log compaction never triggers on first occurrence when threshold > 0.

When no log compaction has occurred, getLatestDeltaCommitInfoSinceLogCompaction() returns empty and defaults numDeltaCommitsSinceLatestLogCompaction to 0. Using Math.min(numDeltaCommitsSinceLatestCompaction, 0) always returns 0, making log compaction never trigger unless threshold is 0.

🐛 Fix
     int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft();
     int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent()
         ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
-        : 0;
+        : numDeltaCommitsSinceLatestCompaction;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java`
around lines 243 - 256, In needLogCompact, the logic uses Math.min(...) with
numDeltaCommitsSinceLatestLogCompaction defaulting to 0 when
getLatestDeltaCommitInfoSinceLogCompaction() is empty, which prevents first-time
log compaction from ever triggering; change the calculation so that when
latestDeltaCommitInfoSinceLogCompactOption is empty you use the full
numDeltaCommitsSinceLatestCompaction (or use Math.max instead of Math.min) to
compute numDeltaCommitsSince, i.e., adjust the computation of
numDeltaCommitsSince in needLogCompact (and keep the subsequent shouldLogCompact
check against config.getLogCompactionBlocksThreshold()) so the first eligible
log compaction can trigger.

Comment on lines +308 to 310
scanDf = IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient)
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))
filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Filter before projecting away helper columns.

filterRequiredColumnsFromDF now runs before PUSH_DOWN_INCR_FILTERS are applied. If one of those filters references _hoodie_commit_time, a partition field, or an ordering field, the column is already gone and the incremental read will fail once pruning is enabled.

🔁 Suggested ordering
-      scanDf = IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient)
-      log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))
-      filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
+      log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))
+      val filteredDf = filters.foldLeft(scanDf)((e, f) => e.filter(f))
+      IncrementalRelationUtil
+        .filterRequiredColumnsFromDF(filteredDf, requiredColumns, metaClient)
+        .rdd
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
scanDf = IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient)
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))
filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))
val filteredDf = filters.foldLeft(scanDf)((e, f) => e.filter(f))
IncrementalRelationUtil
.filterRequiredColumnsFromDF(filteredDf, requiredColumns, metaClient)
.rdd
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala`
around lines 308 - 310, The projection via
IncrementalRelationUtil.filterRequiredColumnsFromDF is removing
helper/partition/ordering columns (e.g., _hoodie_commit_time) before
PUSH_DOWN_INCR_FILTERS are applied, causing incremental reads to fail; fix by
reordering so that the filters (the filters.foldLeft(scanDf)((e,f) =>
e.filter(f))) are applied to scanDf first (use the filtered DataFrame as input
to any subsequent projection), i.e., apply the foldLeft filter chain on scanDf
before calling IncrementalRelationUtil.filterRequiredColumnsFromDF so push-down
filters can reference those helper/partition/ordering columns.

Comment on lines +292 to 293
scanDf = IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient)
filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Apply incremental filters before the post-scan projection.

This has the same regression as V1: helper columns are dropped first, so any PUSH_DOWN_INCR_FILTERS that reference _hoodie_commit_time, partition fields, or MOR ordering fields can no longer be evaluated after pruning.

🔁 Suggested ordering
-      scanDf = IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient)
-      filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
+      val filteredDf = filters.foldLeft(scanDf)((e, f) => e.filter(f))
+      IncrementalRelationUtil
+        .filterRequiredColumnsFromDF(filteredDf, requiredColumns, metaClient)
+        .rdd
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
scanDf = IncrementalRelationUtil.filterRequiredColumnsFromDF(scanDf, requiredColumns, metaClient)
filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
val filteredDf = filters.foldLeft(scanDf)((e, f) => e.filter(f))
IncrementalRelationUtil
.filterRequiredColumnsFromDF(filteredDf, requiredColumns, metaClient)
.rdd
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala`
around lines 292 - 293, The projection via
IncrementalRelationUtil.filterRequiredColumnsFromDF is currently applied before
applying incremental filters, which drops helper columns needed by
PUSH_DOWN_INCR_FILTERS; change the order so you first apply
filters.foldLeft(scanDf)((e, f) => e.filter(f)) on scanDf and then call
IncrementalRelationUtil.filterRequiredColumnsFromDF(...) with the filtered
DataFrame (still using requiredColumns and metaClient), ensuring incremental
filters referencing _hoodie_commit_time, partition fields, or MOR ordering
fields can be evaluated before projection.

Comment on lines +28 to +74
def getPrunedSchema(requiredColumns: Array[String],
usedSchema: StructType,
metaClient: HoodieTableMetaClient) = {
var prunedSchema = StructType(Seq())

// _hoodie_commit_time is a required field. using which query filters are applied.
if (!requiredColumns.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
prunedSchema = prunedSchema.add(usedSchema(HoodieRecord.COMMIT_TIME_METADATA_FIELD))
}

// Add all the required columns as part of pruned schema
requiredColumns.foreach(col => {
val field = usedSchema.find(_.name == col)
if (field.isDefined) {
prunedSchema = prunedSchema.add(field.get)
}
})

// All the partition fields are required columns while querying the data.
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
if (partitionColumns.isPresent) {
partitionColumns.get().foreach(col => {
if (!requiredColumns.contains(col)) {
val field = usedSchema.find(_.name == col)
if (field.isDefined) {
prunedSchema = prunedSchema.add(field.get)
}
}
})
}

// The precombine/ordering field is required for merge logic in MOR tables
if (tableConfig.getTableType == HoodieTableType.MERGE_ON_READ) {
val orderingFields = tableConfig.getOrderingFields
if (!orderingFields.isEmpty) {
orderingFields.forEach(col => {
if (!requiredColumns.contains(col)) {
val field = usedSchema.find(_.name == col)
if (field.isDefined) {
prunedSchema = prunedSchema.add(field.get)
}
}
})
}
}
prunedSchema
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Deduplicate columns while building the pruned schema.

getPrunedSchema can add the same field more than once when a required column also happens to be a partition field or an ordering field. On MOR tables where the ordering field overlaps the partition field, this produces a duplicate StructType and breaks the scan again.

🛠️ Minimal fix
   def getPrunedSchema(requiredColumns: Array[String],
                       usedSchema: StructType,
                       metaClient: HoodieTableMetaClient) = {
     var prunedSchema = StructType(Seq())
+    val addedFields = scala.collection.mutable.LinkedHashSet[String]()
+
+    def addIfPresent(col: String): Unit = {
+      usedSchema.find(_.name == col).foreach { field =>
+        if (addedFields.add(field.name)) {
+          prunedSchema = prunedSchema.add(field)
+        }
+      }
+    }

     // _hoodie_commit_time is a required field. using which query filters are applied.
     if (!requiredColumns.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
-      prunedSchema = prunedSchema.add(usedSchema(HoodieRecord.COMMIT_TIME_METADATA_FIELD))
+      addIfPresent(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
     }

     // Add all the required columns as part of pruned schema
     requiredColumns.foreach(col => {
-      val field = usedSchema.find(_.name == col)
-      if (field.isDefined) {
-        prunedSchema = prunedSchema.add(field.get)
-      }
+      addIfPresent(col)
     })
@@
       partitionColumns.get().foreach(col => {
         if (!requiredColumns.contains(col)) {
-          val field = usedSchema.find(_.name == col)
-          if (field.isDefined) {
-            prunedSchema = prunedSchema.add(field.get)
-          }
+          addIfPresent(col)
         }
       })
@@
         orderingFields.forEach(col => {
           if (!requiredColumns.contains(col)) {
-            val field = usedSchema.find(_.name == col)
-            if (field.isDefined) {
-              prunedSchema = prunedSchema.add(field.get)
-            }
+            addIfPresent(col)
           }
         })
       }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/IncrementalRelationUtil.scala`
around lines 28 - 74, getPrunedSchema currently appends fields to prunedSchema
multiple times (e.g., requiredColumns, partitionColumns, orderingFields and the
commit_time field can overlap) which yields duplicate StructFields; modify
getPrunedSchema to deduplicate by tracking added field names (e.g., a mutable
Set[String] or checking prunedSchema.fieldNames contains the name) before adding
any field (including the commit_time and fields added from requiredColumns,
partitionColumns, and orderingFields) so each StructField is only added once;
use symbols from the diff like getPrunedSchema, prunedSchema,
HoodieRecord.COMMIT_TIME_METADATA_FIELD, metaClient.getTableConfig,
partitionColumns, and orderingFields to locate and update the logic.

Comment on lines +24 to +28
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.14.1-rc2</version>
<relativePath>../../pom.xml</relativePath>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use the current reactor parent version here.

relativePath points at ../../pom.xml, but this child declares parent 0.14.1-rc2 while the root POM in this PR is 1.2.0-SNAPSHOT. Maven will reject the module as having the wrong parent, so the new reactor entry will not build.

Suggested fix
     <parent>
         <artifactId>hudi</artifactId>
         <groupId>org.apache.hudi</groupId>
-        <version>0.14.1-rc2</version>
+        <version>1.2.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.14.1-rc2</version>
<relativePath>../../pom.xml</relativePath>
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packaging/hudi-azure-bundle/pom.xml` around lines 24 - 28, The parent
declaration in this module is pinned to version 0.14.1-rc2 which mismatches the
current reactor root POM; update the <parent><version> value for artifactId
"hudi" and groupId "org.apache.hudi" to match the reactor root (e.g.,
1.2.0-SNAPSHOT) or replace it with a reactor property like ${project.version} so
the module inherits the correct parent when building with the reactor (keep the
existing <relativePath>).

Comment on lines +193 to +239
<!--
Runtime-required dependencies NOT bundled in this fat jar.
The shaded com.azure classes retain unshaded bytecode references to these packages.
They MUST be provided at runtime via Dockerfile jars or Spark's classpath.

Why not bundled: Shading Netty breaks JNI native transport (epoll) because native
.so libraries reference class names by string that the shade plugin cannot rewrite.
Instead, we rely on a single controlled version provided externally.

Required runtime jars (see Dockerfiles in onehouse-dataplane and spark-connect-server):
- reactor-netty-http-1.0.10.jar
- reactor-netty-core-1.0.10.jar
- reactor-core-3.4.38.jar
- reactive-streams-1.0.4.jar
- netty-resolver-dns-4.1.96.Final.jar
- netty-codec-dns-4.1.96.Final.jar
- jackson-dataformat-xml-2.15.2.jar
- woodstox-core-6.5.1.jar
- stax2-api-4.2.1.jar
- jna-5.13.0.jar
- jna-platform-5.13.0.jar

Spark 3.x ships unshaded io.netty.* in $SPARK_HOME/jars/ (Spark 2.x shaded Netty
as org.spark_project.io.netty.* — this bundle is NOT compatible with Spark 2.x).
-->
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-http</artifactId>
<version>1.0.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
<version>1.0.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.38</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid pinning known-vulnerable Reactor Netty jars as the required runtime set.

These are provided, but the comment above them tells operators to ship exactly these versions. That still makes the supported deployment path depend on reactor-netty-http/core 1.0.10, which OSV flags for multiple HIGH issues.

Which patched versions of io.projectreactor.netty:reactor-netty-http and io.projectreactor.netty:reactor-netty-core fix GHSA-q24v-hpg3-v3jp, GHSA-4q2v-9p7v-3v22, and GHSA-xjhv-p3fv-x24r, and which reactor-core version is compatible with those fixes?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packaging/hudi-azure-bundle/pom.xml` around lines 193 - 239, The comment and
provided-scoped dependencies currently pin
io.projectreactor.netty:reactor-netty-http and
io.projectreactor.netty:reactor-netty-core to the known-vulnerable 1.0.10 and
reactor-core to 3.4.38; stop recommending exact vulnerable versions—look up
which patched versions resolve GHSA-q24v-hpg3-v3jp, GHSA-4q2v-9p7v-3v22, and
GHSA-xjhv-p3fv-x24r, then update the pom and the surrounding comment to require
at least those patched versions for artifactIds reactor-netty-http and
reactor-netty-core and set reactor-core to the compatible release that those
patched reactor-netty versions require; ensure the dependencies in the pom.xml
are updated (artifactIds: reactor-netty-http, reactor-netty-core, reactor-core)
or change the docs to recommend “>= [patched-version]” instead of pinning
1.0.10/3.4.38 so operators will ship non-vulnerable runtime jars.

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Apr 9, 2026

Greptile Summary

This PR mirrors apache/hudi#18487 and packs five distinct feature areas despite its "fix: Update hudi-azure-bundle dependencies" title:

  1. New hudi-azure-bundle packaging module: Maven Shade fat-jar bundling Azure SDK, correctly excluding Netty/Reactor to avoid JNI native transport failures.
  2. Clustering expiration / heartbeat-based rollback: hoodie.clustering.enable.expirations + hoodie.clustering.expiration.threshold.mins let the lazy-clean policy detect and roll back stale clustering instants without aborting live jobs.
  3. Distributed metrics registry refactor: Table-name-scoped registry keys (tableName::registryName) and executor-side propagation via Spark task closures.
  4. Flink source reader improvements: RecordLimiter for pushed-down LIMIT, CDC split-reader refactor into abstract base, and coordinator recovery fix for RLI-with-bootstrap tables.
  5. Spark incremental query improvements: IncrementalRelationV1 switched to PrunedScan, schema deduplication for skeleton/data field overlap.

Key concerns:

  • rollbackFailedClusteringForPartitions throws HoodieException when a clustering instant is still alive instead of skipping it.
  • Registry.setRegistries stores registries under the wrong key format, making distributed executor metrics silently unreachable.
  • RecordLimiter.wrap() does not guard against the NO_LIMIT (-1) sentinel (safe in practice but fragile).

Confidence Score: 3/5

Not safe to merge as-is: two P1 bugs need targeted fixes before this is merge-ready.

Two P1 issues must be resolved: (1) rollbackFailedClusteringForPartitions throws instead of skipping live clustering instants, breaking writes during normal concurrent operation; (2) Registry.setRegistries key format mismatch silently prevents distributed executor metrics from being retrievable. The rest of the changeset is well-constructed.

hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java (throw-vs-skip logic) and hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java (key format mismatch in setRegistries).

Vulnerabilities

No security concerns identified. The Azure DefaultAzureCredentialBuilder usage follows standard managed-identity patterns and does not expose secrets. Shade relocations for Azure SDK classes reduce classpath collision risk.

Important Files Changed

Filename Overview
packaging/hudi-azure-bundle/pom.xml New fat-jar module for Azure; shade config correctly excludes Netty/Reactor, but parquet-avro and avro are compile-scoped yet not shaded.
pom.xml Adds hudi-azure-bundle module registration; straightforward one-line change.
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java Adds rollbackFailedClusteringForPartitions; throws instead of skipping when a clustering instant is not eligible for rollback — a P1 logic bug.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java Adds heartbeat-based clustering expiration for lazy-clean policy; widens rollback candidate filter to filterInflightsAndRequested, intentional for the new feature.
hudi-io/src/main/java/org/apache/hudi/common/metrics/Registry.java Adds table-scoped registry keys and setRegistries; setRegistries stores under wrong key format, breaking executor-side lookups.
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java Propagates distributed metric registries to Spark executor closures; effectiveness depends on the Registry.setRegistries key fix.
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/RecordLimiter.java New utility for pushed-down LIMIT; wrap() inner lambda doesn't guard NO_LIMIT but safe in practice since callers skip construction for NO_LIMIT.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java Adds ENABLE_EXPIRATIONS and EXPIRATION_THRESHOLD_MINS config properties; inference function auto-selects AllowUpdate vs RejectUpdate.
hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java New Azure Blob Storage-based distributed lock client using ETag optimistic concurrency; comprehensive implementation.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala Switches from TableScan to PrunedScan; fixes schema deduplication between skeleton and data schemas.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Writer schedules clustering instant] --> B[heartbeatClient.start instantTime]
    B --> C[Clustering plan stored in timeline]
    D[Another writer / lazy-clean] --> E{isExpirationOfClusteringEnabled?}
    E -->|No| F[Skip clustering instants for rollback]
    E -->|Yes| G[filterPendingClusteringTimeline]
    G --> H{isClusteringInstantEligibleForRollback?}
    H -->|hasInstantExpired AND heartbeatExpired| I[Add to rollback candidates]
    H -->|Heartbeat still active OR too recent| J[Skip instant]
    I --> K[getInstantsToRollbackForLazyCleanPolicy]
    K --> L[Reload timeline]
    L --> M{Still in filterInflightsAndRequested?}
    M -->|Yes| N[Rollback clustering instant]
    M -->|No| O[Skip - committed by another writer]
    P[Clustering commits] --> Q[heartbeatClient.stop clusteringCommitTime]
    subgraph rollbackFailedClusteringForPartitions
        R[getPendingClusteringInstantsForPartitions] --> S{isClusteringInstantEligibleForRollback?}
        S -->|No| T[Throws HoodieException - should skip]
        S -->|Yes| U[client.rollback instant]
    end
Loading

Reviews (1): Last reviewed commit: "Add provided jars" | Re-trigger Greptile

Comment on lines +327 to +332
*
* @param client the write client to use for rollback operations
* @param metaClient the table meta client
* @param partitions list of partition paths to check against pending clustering plans
*/
public static void rollbackFailedClusteringForPartitions(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Throws instead of skipping live clustering instants

rollbackFailedClusteringForPartitions throws a HoodieException when any pending clustering instant targeting the requested partitions is not eligible for rollback (i.e., its heartbeat is still active or the instant is too recent). This means the caller will receive an exception any time a healthy, in-progress clustering job exists for those partitions — an expected and normal condition during concurrent writes.

The docstring says "Rolls back pending clustering instants that … are eligible for rollback", which implies ineligible ones should be skipped silently, not cause a failure.

// Suggested fix (skip ineligible):
if (!BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(...)) {
    LOG.info("Clustering instant {} is not eligible for rollback (heartbeat still active or too recent), skipping.",
            instant.requestedTime());
    continue;
}

Throwing here will cause write operations to the affected partitions to fail whenever a live clustering job exists, which is the exact scenario this feature is meant to handle gracefully.

Comment on lines 147 to +152
}

/**
* Set all registries if they are not already registered.
*/
static void setRegistries(Collection<Registry> registries) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 setRegistries key format mismatch breaks executor-side lookups

When HoodieSparkEngineContext.setRegistries() is called inside a Spark task (on executors), it stores distributed registries in REGISTRY_MAP using the key makeKey("", registry.getName()) which produces "::tableName.HoodieWrapperFileSystem". However, getRegistryOfClass(tableName, registryName, clazz) builds its key as makeKey(tableName, registryName) which produces "tableName::HoodieWrapperFileSystem".

These two formats are different. Any executor-side code that tries to retrieve the distributed registry through the normal API will not find it and will create a fresh LocalRegistry instead, so distributed metrics are never accumulated back to the driver.

The fix is to expose both tableName and registryName separately from Registry so setRegistries can reconstruct the correct compound key.

Comment on lines +58 to +62
}

@Override
public T nextRecordFromSplit() {
if (totalReadCount.get() >= limit) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 wrap() is unsafe when limit == NO_LIMIT (-1)

Inside the delegating nextRecordFromSplit(), the guard totalReadCount.get() >= limit evaluates to 0L >= -1Ltrue immediately when limit == NO_LIMIT, so nextRecordFromSplit() always returns null. The current callers in HoodieSourceReader construct RecordLimiter only when scanContext.getLimit() != NO_LIMIT, so this is safe in practice, but the class contract is fragile.

Suggested change
}
@Override
public T nextRecordFromSplit() {
if (totalReadCount.get() >= limit) {
public T nextRecordFromSplit() {
if (limit != NO_LIMIT && totalReadCount.get() >= limit) {
return null;
}

Comment on lines +265 to +284
</dependency>
<dependency>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>stax2-api</artifactId>
<version>4.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.13.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<version>5.13.0</version>
<scope>provided</scope>
</dependency>

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 parquet-avro and avro declared compile but not shaded

Both are <scope>compile</scope> but absent from the shade plugin's <artifactSet><includes> list, so they will not be bundled into the fat jar. If the intent is to rely on Spark/Hadoop's classpath at runtime (matching the Netty/Reactor pattern), declare them as <scope>provided</scope> to make the intent explicit.

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>${parquet.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>${avro.version}</version>
    <scope>provided</scope>
</dependency>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants