[OSS PR #18409] feat(spark): refresh parquet tools clustering strategy for current master#37
Conversation
📝 WalkthroughWalkthroughThis pull request introduces a new external file clustering feature for Hudi. The changes add a metadata-driven approach to write status generation, a specialized write handle for external file clustering operations, and execution strategies (both abstract and test implementations) that orchestrate the clustering transformation workflow with pluggable file transformation logic. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant Strategy as SparkExternalFileClusteringExecutionStrategy
participant Handle as ExternalFileClusteringWriteHandle
participant Converter as FileMetadataWriteStatusConverter
participant Storage as HoodieStorage/ParquetUtils
User->>Strategy: performClusteringForGroup()
Strategy->>Handle: create ExternalFileClusteringWriteHandle
Strategy->>Strategy: transformFile(oldPath, newPath)
Note over Strategy: Abstract method<br/>implemented by subclass
alt Transformation Success
Strategy->>Handle: close()
Handle->>Handle: build executionConfigs<br/>(prevCommit, timeElapsed)
Handle->>Converter: convert(parquetFile, partition, configs)
Converter->>Storage: ParquetUtils.getRowCount()
Converter->>Storage: storage.getPathInfo().getLength()
Converter->>Converter: generateHoodieWriteStat()
Converter-->>Handle: WriteStatus
Handle-->>Strategy: List<WriteStatus>
Strategy-->>User: List<WriteStatus>
else Transformation Failure
Strategy->>Storage: delete output file
Strategy-->>User: throw HoodieClusteringException
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
Greptile SummaryThis PR introduces a new external-file clustering execution strategy for Apache Spark, enabling Hudi to delegate the actual file rewrite to an arbitrary external transformation (e.g., a Parquet-tools command) rather than reading and re-writing records through the standard Hudi write path. The key additions are:
Issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant S as SparkExternalFileClustering<br/>ExecutionStrategy
participant H as ExternalFileClusteringWriteHandle
participant T as transformFile()<br/>(subclass impl)
participant C as FileMetadataWrite<br/>StatusConverter
participant FS as HoodieStorage
S->>H: new(config, instantTime, table, partitionPath, fileId, oldFilePath)
H->>FS: makeNewPath(partitionPath) → newFilePath
H->>FS: createMarkerFile(partitionPath, newFilePath)
S->>T: transformFile(oldFilePath, newFilePath)
alt transformation fails
T-->>S: Exception
S->>FS: deleteFile(newFilePath)
S-->>S: throw HoodieClusteringException
end
S->>H: close()
H->>FS: exists(newFilePath)
H->>C: "convert(newFilePath, partitionPath, {PREV_COMMIT, TIME_TAKEN})"
C->>FS: getRowCount(parquetFilePath)
C->>FS: getPathInfo(parquetFilePath).getLength()
C-->>H: WriteStatus (with HoodieWriteStat)
H-->>S: List[WriteStatus]
|
|
|
||
| return Collections.singletonList(writeStatus); | ||
| } catch (IOException e) { | ||
| throw new HoodieInsertException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); |
There was a problem hiding this comment.
Wrong exception type for clustering context
The catch block wraps the IOException in HoodieInsertException, but this is a clustering operation — not an insert. If downstream code catches HoodieClusteringException specifically (e.g., for retry or rollback logic), this mismatch will silently bypass that handling.
| throw new HoodieInsertException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); | |
| throw new HoodieIOException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); |
Alternatively, HoodieClusteringException would be the most semantically correct choice here.
| public List<WriteStatus> close() { | ||
| try { | ||
| if (!hoodieTable.getStorage().exists(path)) { | ||
| throw new HoodieIOException("Output file does not exist, transformation may not have been invoked: " + path); | ||
| } | ||
|
|
||
| Map<String, Object> executionConfigs = new HashMap<>(); | ||
| executionConfigs.put(PREV_COMMIT, prevCommit); | ||
| executionConfigs.put(TIME_TAKEN, timer.endTimer()); | ||
|
|
||
| this.writeStatus = generateWriteStatus(path.toString(), partitionPath, executionConfigs); | ||
| LOG.info(String.format("ExternalFileClusteringWriteHandle for partitionPath %s fileID %s, took %d ms.", | ||
| writeStatus.getStat().getPartitionPath(), writeStatus.getStat().getFileId(), | ||
| writeStatus.getStat().getRuntimeStats().getTotalCreateTime())); | ||
|
|
||
| return Collections.singletonList(writeStatus); | ||
| } catch (IOException e) { | ||
| throw new HoodieInsertException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); | ||
| } | ||
| } |
There was a problem hiding this comment.
All other HoodieWriteHandle implementations (BaseCreateHandle, HoodieWriteMergeHandle, etc.) call markClosed() at the end of close() to maintain the closed state flag from the parent class. Omitting it here means isClosed() will always return false for this handle, which could allow re-entrancy or confuse any future code that guards on this state.
| public List<WriteStatus> close() { | |
| try { | |
| if (!hoodieTable.getStorage().exists(path)) { | |
| throw new HoodieIOException("Output file does not exist, transformation may not have been invoked: " + path); | |
| } | |
| Map<String, Object> executionConfigs = new HashMap<>(); | |
| executionConfigs.put(PREV_COMMIT, prevCommit); | |
| executionConfigs.put(TIME_TAKEN, timer.endTimer()); | |
| this.writeStatus = generateWriteStatus(path.toString(), partitionPath, executionConfigs); | |
| LOG.info(String.format("ExternalFileClusteringWriteHandle for partitionPath %s fileID %s, took %d ms.", | |
| writeStatus.getStat().getPartitionPath(), writeStatus.getStat().getFileId(), | |
| writeStatus.getStat().getRuntimeStats().getTotalCreateTime())); | |
| return Collections.singletonList(writeStatus); | |
| } catch (IOException e) { | |
| throw new HoodieInsertException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); | |
| } | |
| } | |
| return Collections.singletonList(writeStatus); | |
| } catch (IOException e) { | |
| throw new HoodieInsertException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); | |
| } finally { | |
| markClosed(); | |
| } |
| */ | ||
| protected WriteStatus generateWriteStatus( | ||
| String outputFile, String partitionPath, Map<String, Object> executionConfigs) throws IOException { | ||
| return new FileMetadataWriteStatusConverter(hoodieTable, config).convert(outputFile, partitionPath, executionConfigs); |
There was a problem hiding this comment.
FileMetadataWriteStatusConverter is instantiated without its generic type parameters. This suppresses compile-time type-safety and generates an unchecked-cast warning. The generic parameters should be propagated from the enclosing class:
| return new FileMetadataWriteStatusConverter(hoodieTable, config).convert(outputFile, partitionPath, executionConfigs); | |
| return new FileMetadataWriteStatusConverter<>(hoodieTable, config).convert(outputFile, partitionPath, executionConfigs); |
| stat.setFileId(writeStatus.getFileId()); | ||
| stat.setPartitionPath(writeStatus.getPartitionPath()); | ||
| stat.setPath(new StoragePath(writeConfig.getBasePath()), parquetFilePath); | ||
| stat.setPrevCommit(String.valueOf(executionConfigs.get(PREV_COMMIT))); |
There was a problem hiding this comment.
String.valueOf(null) silently produces the literal string "null"
If executionConfigs.get(PREV_COMMIT) is null for any reason (e.g., a caller that doesn't set the key, or future refactoring), String.valueOf(null) returns the four-character string "null" rather than null or an empty string. This would corrupt the prevCommit field in the written HoodieWriteStat in a way that is very hard to diagnose.
Prefer an explicit null check:
| stat.setPrevCommit(String.valueOf(executionConfigs.get(PREV_COMMIT))); | |
| String prevCommit = (String) executionConfigs.get(PREV_COMMIT); | |
| stat.setPrevCommit(prevCommit); |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java`:
- Around line 63-64: FileMetadataWriteStatusConverter currently calls
ReflectionUtils.loadClass with two arguments (index implicit flag and failure
fraction) when instantiating WriteStatus; update the call to pass the third
isMetadataTable boolean for consistency with other call sites (e.g.,
HoodieWriteHandle, HoodieAppendHandle) by including the isMetadataTable flag
from this context so
ReflectionUtils.loadClass(this.writeConfig.getWriteStatusClassName(),
!this.hoodieTable.getIndex().isImplicitWithStorage(),
this.writeConfig.getWriteStatusFailureFraction()) becomes the three-argument
form that includes isMetadataTable; ensure you reference
FileMetadataWriteStatusConverter and the writeConfig/getWriteStatusClassName,
hoodieTable.getIndex().isImplicitWithStorage(), and
writeConfig.getWriteStatusFailureFraction() symbols when making the change.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java`:
- Around line 57-63: The external clustering branch drops the
preserveHoodieMetadata flag causing callers requesting fresh Hudi metadata to be
ignored; update performClusteringForGroup and the external path so
preserveHoodieMetadata is either validated and rejected when false
(throwing/returning an explicit error) or passed through into transformFile(...)
and into SingleFileHandleCreateFactory so the external transform can honor it;
locate the external strategy code paths around performClusteringForGroup,
transformFile, and SingleFileHandleCreateFactory and either add plumbing to pass
the boolean through to transformFile(...) or add an explicit check that throws a
clear exception when preserveHoodieMetadata==false.
- Around line 78-90: The new output file isn't cleaned up if
writeHandler.close() throws; wrap the close() call so any exception from
writeHandler.close() also triggers deletion of the partial output and rethrows a
HoodieClusteringException. Specifically, keep the existing transformFile(...)
catch logic but extend it (or use a try/finally around transformFile +
writeHandler.close()) so that failures from writeHandler.close() result in
getHoodieTable().getStorage().deleteFile(writeHandler.getPath()) being called
(log/delete exceptions similarly to the existing catch) and then throw a
HoodieClusteringException that includes the original exception; ensure you
reference transformFile, writeHandler.close(),
getHoodieTable().getStorage().deleteFile(...) and HoodieClusteringException in
your change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: af1eb28b-fcf5-4d3b-9cf5-9c504952a2e6
📒 Files selected for processing (6)
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.javahudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestFileMetadataWriteStatusConverter.javahudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.javahudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.javahudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ExternalFileClusteringTestExecutionStrategy.java
| WriteStatus writeStatus = (WriteStatus) ReflectionUtils.loadClass(this.writeConfig.getWriteStatusClassName(), | ||
| !this.hoodieTable.getIndex().isImplicitWithStorage(), this.writeConfig.getWriteStatusFailureFraction()); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
sed -n '1,180p' hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
rg -nC2 --type=java 'public\s+[A-Za-z0-9_]*WriteStatus\s*\(' hudi-client hudi-common
rg -nC2 --type=java 'getWriteStatusClassName\s*\(' hudi-clientRepository: yihua/hudi
Length of output: 19298
Pass the isMetadataTable flag for consistency.
Lines 63-64 instantiate WriteStatus with only 2 arguments, while other call sites in HoodieWriteHandle and HoodieAppendHandle pass 3 arguments including the isMetadataTable flag. All supported WriteStatus implementations (WriteStatus, BootstrapWriteStatus, FailOnFirstErrorWriteStatus, MetadataMergeWriteStatus) expose both constructors, so runtime failure won't occur, but the semantic mismatch should be addressed for consistency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java`
around lines 63 - 64, FileMetadataWriteStatusConverter currently calls
ReflectionUtils.loadClass with two arguments (index implicit flag and failure
fraction) when instantiating WriteStatus; update the call to pass the third
isMetadataTable boolean for consistency with other call sites (e.g.,
HoodieWriteHandle, HoodieAppendHandle) by including the isMetadataTable flag
from this context so
ReflectionUtils.loadClass(this.writeConfig.getWriteStatusClassName(),
!this.hoodieTable.getIndex().isImplicitWithStorage(),
this.writeConfig.getWriteStatusFailureFraction()) becomes the three-argument
form that includes isMetadataTable; ensure you reference
FileMetadataWriteStatusConverter and the writeConfig/getWriteStatusClassName,
hoodieTable.getIndex().isImplicitWithStorage(), and
writeConfig.getWriteStatusFailureFraction() symbols when making the change.
| protected List<WriteStatus> performClusteringForGroup(ReaderContextFactory<T> readerContextFactory, | ||
| ClusteringGroupInfo clusteringOps, | ||
| Map<String, String> strategyParams, | ||
| boolean preserveHoodieMetadata, | ||
| HoodieSchema schema, | ||
| TaskContextSupplier taskContextSupplier, | ||
| String instantTime) { |
There was a problem hiding this comment.
Propagate or explicitly reject preserveHoodieMetadata=false.
The identity clustering path still threads this flag into SingleFileHandleCreateFactory, but the new external path drops it entirely. Right now a caller can request fresh Hudi metadata and silently get whatever the external transform happened to emit instead.
♻️ Suggested fix
protected List<WriteStatus> performClusteringForGroup(ReaderContextFactory<T> readerContextFactory,
ClusteringGroupInfo clusteringOps,
Map<String, String> strategyParams,
boolean preserveHoodieMetadata,
HoodieSchema schema,
TaskContextSupplier taskContextSupplier,
String instantTime) {
+ if (!preserveHoodieMetadata) {
+ throw new HoodieClusteringException(
+ "External file clustering currently requires preserveHoodieMetadata=true: " + getClass().getName());
+ }
+
LOG.info("Starting clustering operation on input file ids.");If a concrete external strategy can truly rewrite Hudi metadata, plumb this flag into transformFile(...) instead of ignoring it.
Also applies to: 78-80
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java`
around lines 57 - 63, The external clustering branch drops the
preserveHoodieMetadata flag causing callers requesting fresh Hudi metadata to be
ignored; update performClusteringForGroup and the external path so
preserveHoodieMetadata is either validated and rejected when false
(throwing/returning an explicit error) or passed through into transformFile(...)
and into SingleFileHandleCreateFactory so the external transform can honor it;
locate the external strategy code paths around performClusteringForGroup,
transformFile, and SingleFileHandleCreateFactory and either add plumbing to pass
the boolean through to transformFile(...) or add an explicit check that throws a
clear exception when preserveHoodieMetadata==false.
| try { | ||
| // Executes the file transformation. | ||
| transformFile(oldFilePath, writeHandler.getPath()); | ||
| } catch (Exception e) { | ||
| // Clean up partial output file if transformation fails. | ||
| try { | ||
| getHoodieTable().getStorage().deleteFile(writeHandler.getPath()); | ||
| } catch (Exception deleteEx) { | ||
| LOG.warn("Failed to clean up partial output file: " + writeHandler.getPath(), deleteEx); | ||
| } | ||
| throw new HoodieClusteringException("Failed to transform file: " + dataFilePathStr, e); | ||
| } | ||
| return writeHandler.close(); |
There was a problem hiding this comment.
Clean up the output file when writeHandler.close() fails too.
transformFile() is not the only failing step here. close() immediately re-opens the generated file to derive row counts and write stats, so a bad footer/path currently skips this cleanup block and leaves the new file behind.
♻️ Suggested fix
- try {
- // Executes the file transformation.
- transformFile(oldFilePath, writeHandler.getPath());
- } catch (Exception e) {
+ try {
+ // Executes the file transformation.
+ transformFile(oldFilePath, writeHandler.getPath());
+ return writeHandler.close();
+ } catch (Exception e) {
// Clean up partial output file if transformation fails.
try {
getHoodieTable().getStorage().deleteFile(writeHandler.getPath());
} catch (Exception deleteEx) {
LOG.warn("Failed to clean up partial output file: " + writeHandler.getPath(), deleteEx);
}
- throw new HoodieClusteringException("Failed to transform file: " + dataFilePathStr, e);
+ throw new HoodieClusteringException("Failed to externalize clustered file: " + dataFilePathStr, e);
}
- return writeHandler.close();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java`
around lines 78 - 90, The new output file isn't cleaned up if
writeHandler.close() throws; wrap the close() call so any exception from
writeHandler.close() also triggers deletion of the partial output and rethrows a
HoodieClusteringException. Specifically, keep the existing transformFile(...)
catch logic but extend it (or use a try/finally around transformFile +
writeHandler.close()) so that failures from writeHandler.close() result in
getHoodieTable().getStorage().deleteFile(writeHandler.getPath()) being called
(log/delete exceptions similarly to the existing catch) and then throw a
HoodieClusteringException that includes the original exception; ensure you
reference transformFile, writeHandler.close(),
getHoodieTable().getStorage().deleteFile(...) and HoodieClusteringException in
your change.
Mirror of apache#18409 for automated bot review.
Original author: @suryaprasanna
Base branch: master
Summary by CodeRabbit
New Features
Tests