feat(spark): refresh parquet tools clustering strategy for current master#18409
Conversation
|
@nsivabalan Executing any parquet tools operations special jar to be included in the runtime, so I am not adding the column nullifying parquet tools execution strategy. Let us just keep the test class itself. |
nsivabalan
left a comment
There was a problem hiding this comment.
I feel, we could name the classes better
Summary Table
| Class | Current Name | Recommended Name | Why |
|---|---|---|---|
| Strategy | ParquetToolsExecutionStrategy | SparkExternalFileClusteringExecutionStrategy | Describes what (external file processing) not how (parquet-tools) |
| Handle | HoodieFileWriteHandle | ExternalFileClusteringWriteHandle | Too generic → specific to clustering use case |
| Converter | ParquetFileMetaToWriteStatusConvertor | FileMetadataWriteStatusConverter | Shorter, fixes typo (Convertor→Converter), more general |
| Method | executeTools | transformFile | Clear contract: transform source to target |
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing! The parquet-tools clustering strategy is a nice extension point. I flagged one likely bug — the empty-list guard in ParquetToolsExecutionStrategy.performClusteringForGroup uses > 1 instead of != 1, which would cause an IndexOutOfBoundsException on an empty clustering operations list. A couple of other items around error handling and API consistency are worth discussing in the inline comments.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
LGTM — nice updates addressing all prior feedback. The != 1 guard fix, StoragePath migration in the abstract API, file-existence check in close(), and the try-catch with partial file cleanup in performClusteringForGroup all look correct. The renames to SparkExternalFileClusteringExecutionStrategy / ExternalFileClusteringWriteHandle / FileMetadataWriteStatusConverter are cleaner and more descriptive. All prior review comments (both mine and @nsivabalan's) have been addressed.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This pull request introduces a new external file clustering feature for Hudi. The changes add a metadata-driven approach to write status generation, a specialized write handle for external file clustering operations, and execution strategies (both abstract and test implementations) that orchestrate the clustering transformation workflow with pluggable file transformation logic.
Greptile Summary: This PR introduces a new external-file clustering execution strategy for Apache Spark, enabling Hudi to delegate the actual file rewrite to an arbitrary external transformation (e.g., a Parquet-tools command) rather than reading and re-writing records through the standard Hudi write path. The key additions are:
FileMetadataWriteStatusConverter– builds aWriteStatus/HoodieWriteStatby reading file metadata (row count, file size) directly from the output Parquet file rather than from individual record writes.ExternalFileClusteringWriteHandle– aHoodieWriteHandlesubclass that creates the output path and marker file upfront, then delegates actual writing to the caller before finalising the write status inclose().SparkExternalFileClusteringExecutionStrategy– abstract strategy that enforces one-operation-per-group, invokes the user-suppliedtransformFile(), and handles cleanup on failure.- Test helpers
ClusteringIdentityTestExecutionStrategyandExternalFileClusteringTestExecutionStrategyto exercise the new skeleton.
Issues found:
ExternalFileClusteringWriteHandle.close()wrapsIOExceptioninHoodieInsertException— the wrong exception type for a clustering context; should useHoodieClusteringExceptionorHoodieIOException.FileMetadataWriteStatusConverter.generateHoodieWriteStat()usesString.valueOf(executionConfigs.get(PREV_COMMIT)), which silently produces the literal string"null"if the value isnull, corrupting theprevCommitfield on the write stat.ExternalFileClusteringWriteHandle.close()never callsmarkClosed(), leaving the parent-classclosedflag permanentlyfalse.generateWriteStatus()instantiatesFileMetadataWriteStatusConverteras a raw type — generic parameters should be forwarded.TestFileMetadataWriteStatusConverterhard-codes the map keys"totalCreateTime"and"prevCommit"rather than referencing the exported constants, making the test fragile to future renames.
Greptile Confidence Score: 3/5
- The PR introduces a useful and well-structured feature, but two logic-level issues in production code (wrong exception type in
close()andString.valueOf(null)→"null"string corruption) should be addressed before merging. - The overall architecture is sound and the test coverage is reasonable. However, the
HoodieInsertExceptionwrapping in a clustering context could cause exception-type-sensitive error handling to misbehave, and theString.valueOf(null)issue ingenerateHoodieWriteStatcould silently corrupt write statistics. These are targeted, concrete fixes rather than deep rearchitecting, so the PR is close to ready. ExternalFileClusteringWriteHandle.javaandFileMetadataWriteStatusConverter.javaneed the two logic fixes before merge.
Sequence Diagram (CodeRabbit):
sequenceDiagram
actor User
participant Strategy as SparkExternalFileClusteringExecutionStrategy
participant Handle as ExternalFileClusteringWriteHandle
participant Converter as FileMetadataWriteStatusConverter
participant Storage as HoodieStorage/ParquetUtils
User->>Strategy: performClusteringForGroup()
Strategy->>Handle: create ExternalFileClusteringWriteHandle
Strategy->>Strategy: transformFile(oldPath, newPath)
Note over Strategy: Abstract method<br/>implemented by subclass
alt Transformation Success
Strategy->>Handle: close()
Handle->>Handle: build executionConfigs<br/>(prevCommit, timeElapsed)
Handle->>Converter: convert(parquetFile, partition, configs)
Converter->>Storage: ParquetUtils.getRowCount()
Converter->>Storage: storage.getPathInfo().getLength()
Converter->>Converter: generateHoodieWriteStat()
Converter-->>Handle: WriteStatus
Handle-->>Strategy: List<WriteStatus>
Strategy-->>User: List<WriteStatus>
else Transformation Failure
Strategy->>Storage: delete output file
Strategy-->>User: throw HoodieClusteringException
end
Sequence Diagram (Greptile):
sequenceDiagram
participant S as SparkExternalFileClustering<br/>ExecutionStrategy
participant H as ExternalFileClusteringWriteHandle
participant T as transformFile()<br/>(subclass impl)
participant C as FileMetadataWrite<br/>StatusConverter
participant FS as HoodieStorage
S->>H: new(config, instantTime, table, partitionPath, fileId, oldFilePath)
H->>FS: makeNewPath(partitionPath) → newFilePath
H->>FS: createMarkerFile(partitionPath, newFilePath)
S->>T: transformFile(oldFilePath, newFilePath)
alt transformation fails
T-->>S: Exception
S->>FS: deleteFile(newFilePath)
S-->>S: throw HoodieClusteringException
end
S->>H: close()
H->>FS: exists(newFilePath)
H->>C: "convert(newFilePath, partitionPath, {PREV_COMMIT, TIME_TAKEN})"
C->>FS: getRowCount(parquetFilePath)
C->>FS: getPathInfo(parquetFilePath).getLength()
C-->>H: WriteStatus (with HoodieWriteStat)
H-->>S: List[WriteStatus]
CodeRabbit: yihua#37 (review)
Greptile: yihua#37 (review)
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18409 +/- ##
============================================
+ Coverage 68.21% 68.83% +0.61%
- Complexity 27709 28235 +526
============================================
Files 2440 2463 +23
Lines 134249 135336 +1087
Branches 16179 16394 +215
============================================
+ Hits 91578 93153 +1575
+ Misses 35565 34806 -759
- Partials 7106 7377 +271
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
This PR refreshes the parquet-tools based clustering strategy from the older
parquet-toolsbranch so it can be proposed against currentapache/master.The original implementation had drifted from current Hudi internals and test APIs. This refresh keeps the existing simple rewrite hook shape while aligning the implementation with current clustering and storage behavior.
Summary and Changelog
Refresh the parquet-tools clustering strategy and its supporting tests for current master.
ParquetToolsExecutionStrategyAPI simple with the existing file-to-file rewrite hookStoragePath/HoodieStoragebased APIsFSUtils.getCommitTime(...)Impact
No public API change intended.
This keeps the existing parquet-tools rewrite extension point, but makes it compatible with current Hudi master and current clustering output semantics.
Risk Level
low
The change is localized to the parquet-tools rewrite path and related test scaffolding.
Documentation Update
none
Contributor's checklist