From 3a114c73e8fb13325a30c48211b5d0974f92c437 Mon Sep 17 00:00:00 2001 From: Surya Prasanna Kumar Yalla Date: Fri, 27 Mar 2026 23:09:14 -0700 Subject: [PATCH 1/4] feat(spark): add parquet tools clustering strategy --- ...ParquetFileMetaToWriteStatusConvertor.java | 110 +++++++++++ .../apache/hudi/io/HoodieFileWriteHandle.java | 119 ++++++++++++ ...ParquetFileMetaToWriteStatusConvertor.java | 173 ++++++++++++++++++ .../ParquetToolsExecutionStrategy.java | 90 +++++++++ ...usteringIdentityTestExecutionStrategy.java | 65 +++++++ .../ParquetToolsTestExecutionStrategy.java | 54 ++++++ 6 files changed, 611 insertions(+) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java new file mode 100644 index 0000000000000..761eb6fab95a0 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * This class is mainly used by the ParquetToolsExecutionStrategy to generate WriteStatus classes. + */ +public class ParquetFileMetaToWriteStatusConvertor { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetFileMetaToWriteStatusConvertor.class); + private final HoodieTable hoodieTable; + private final HoodieWriteConfig writeConfig; + private final FileSystem fs; + public static final String PREV_COMMIT = "prevCommit"; + public static final String TIME_TAKEN = "timeTaken"; + + public ParquetFileMetaToWriteStatusConvertor(HoodieTable hoodieTable, HoodieWriteConfig writeConfig) { + this.hoodieTable = hoodieTable; + this.writeConfig = writeConfig; + this.fs = this.hoodieTable.getMetaClient().getFs(); + } + + /** + * This method generates writeStatus object from parquet file. + */ + public WriteStatus convert(String parquetFile, String partitionPath, + Map executionConfigs) throws IOException { + LOG.info("Creating write status for parquet file " + parquetFile); + WriteStatus writeStatus = (WriteStatus) ReflectionUtils.loadClass(this.writeConfig.getWriteStatusClassName(), + !this.hoodieTable.getIndex().isImplicitWithStorage(), this.writeConfig.getWriteStatusFailureFraction()); + Path parquetFilePath = new Path(parquetFile); + writeStatus.setFileId(FSUtils.getFileId(parquetFilePath.getName())); + writeStatus.setPartitionPath(partitionPath); + generateHoodieWriteStat(writeStatus, parquetFilePath, executionConfigs); + return writeStatus; + } + + /** + * This method generates HoodieWriteStat object and set it as part of WriteStatus object. + */ + private void generateHoodieWriteStat( + WriteStatus writeStatus, Path parquetFilePath, Map executionConfigs) throws IOException { + HoodieWriteStat stat = new HoodieWriteStat(); + + // Set row count + ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(this.fs.getConf(), parquetFilePath, + ParquetMetadataConverter.NO_FILTER); + List blockMetaDataList = parquetMetadata.getBlocks(); + long rowCount = blockMetaDataList.stream().mapToLong(BlockMetaData::getRowCount).sum(); + stat.setNumWrites(rowCount); + stat.setNumInserts(rowCount); + + // Set runtime stats + HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); + runtimeStats.setTotalCreateTime((long) executionConfigs.get(TIME_TAKEN)); + stat.setRuntimeStats(runtimeStats); + + // File size + FileStatus parquetFileStatus = this.fs.getFileStatus(parquetFilePath); + long fileSize = parquetFileStatus.getLen(); + stat.setFileSizeInBytes(fileSize); + stat.setTotalWriteBytes(fileSize); + + stat.setFileId(writeStatus.getFileId()); + stat.setPartitionPath(writeStatus.getPartitionPath()); + stat.setPath(new Path(writeConfig.getBasePath()), parquetFilePath); + stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + stat.setPrevCommit(String.valueOf(executionConfigs.get(PREV_COMMIT))); + + writeStatus.setStat(stat); + } + +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java new file mode 100644 index 0000000000000..4ef0f4b9f0888 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor; +import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor.PREV_COMMIT; +import static org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor.TIME_TAKEN; + +/** + * Write handle that is used to work on top of files rather than on individual records. + */ +public class HoodieFileWriteHandle extends HoodieWriteHandle { + + private static final Logger LOG = LoggerFactory.getLogger(HoodieFileWriteHandle.class); + private final Path path; + private String prevCommit; + + public HoodieFileWriteHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, + Path oldFilePath) { + super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); + + // Output file path. + this.path = makeNewPath(partitionPath); + // Get the prev commit from existing or old file. + this.prevCommit = oldFilePath.getName().split("_")[2].split("\\.")[0]; + + // Create inProgress marker file + createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension())); + // TODO: HUDI-6416 Create inprogress marker here and remove above marker file creation, once the marker PR is landed. + // createInProgressMarkerFile(partitionPath,FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension())); + LOG.info("New HoodieFileWriteHandle for partition :" + partitionPath + " with fileId " + fileId); + } + + /** + * Complete writing of the file by creating the success marker file. + * @return WriteStatuses, ideally it will be only one object. + */ + @Override + public List close() { + LOG.info("Closing the file " + this.fileId + " as we are done with the file."); + try { + Map executionConfigs = new HashMap<>(); + executionConfigs.put(PREV_COMMIT, prevCommit); + executionConfigs.put(TIME_TAKEN, timer.endTimer()); + + this.writeStatus = generateWriteStatus(path.toString(), partitionPath, executionConfigs); + + // TODO: HUDI-6416 Create completed marker file here once the marker PR is landed. + // createCompleteMarkerFile throws hoodieException, if marker directory is not present. + // createCompletedMarkerFile(partitionPath); + LOG.info(String.format("HoodieFileWriteHandle for partitionPath %s fileID %s, took %d ms.", + writeStatus.getStat().getPartitionPath(), writeStatus.getStat().getFileId(), + writeStatus.getStat().getRuntimeStats().getTotalCreateTime())); + + return Collections.singletonList(writeStatus); + } catch (IOException e) { + throw new HoodieInsertException("Failed to close the HoodieFileWriteHandle for path " + path, e); + } + } + + /** + * Given a parquet file it generates WriteStatus object for a parquet file. + * @param outputFile parquet file + * @param partitionPath partition information of the parquet file + * @param executionConfigs Some configs collected during execution. + * @return WriteStatus object. + * @throws IOException + */ + protected WriteStatus generateWriteStatus( + String outputFile, String partitionPath, Map executionConfigs) throws IOException { + ParquetFileMetaToWriteStatusConvertor convertor = + new ParquetFileMetaToWriteStatusConvertor(hoodieTable, config); + return convertor.convert(outputFile, partitionPath, executionConfigs); + } + + @Override + public IOType getIOType() { + return IOType.CREATE; + } + + public Path getPath() { + return path; + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java new file mode 100644 index 0000000000000..e0939b10b5a98 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.table.HoodieTable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestParquetFileMetaToWriteStatusConvertor extends HoodieCommonTestHarness { + + private TaskContextSupplier mockedContextSupplier; + private HoodieTable mockedHoodieTable; + private HoodieIndex mockedIndex; + + @BeforeEach + public void init() throws IOException, NoSuchFieldException { + initPath(); + initTestDataGenerator(); + initMetaClient(); + + mockedContextSupplier = mock(TaskContextSupplier.class); + when(mockedContextSupplier.getAttemptIdSupplier()).thenReturn(() -> 1L); + when(mockedContextSupplier.getStageIdSupplier()).thenReturn(() -> 1); + when(mockedContextSupplier.getPartitionIdSupplier()).thenReturn(() -> 1); + + mockedIndex = mock(HoodieIndex.class); + when(mockedIndex.isImplicitWithStorage()).thenReturn(true); + + Configuration configuration = metaClient.getHadoopConf(); + mockedHoodieTable = mock(HoodieTable.class); + when(mockedHoodieTable.getHadoopConf()).thenReturn(configuration); + when(mockedHoodieTable.getMetaClient()).thenReturn(metaClient); + when(mockedHoodieTable.getIndex()).thenReturn(mockedIndex); + } + + @Test + public void testWriteStatusConvertion() throws IOException { + HoodieWriteConfig writeConfig = + getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM) + .build(); + String fileId = UUID.randomUUID().toString(); + String prevCommitTime = HoodieActiveTimeline.createNewInstantTime(); + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + String writeToken = FSUtils.makeWriteToken(mockedContextSupplier.getPartitionIdSupplier().get(), + mockedContextSupplier.getStageIdSupplier().get(), mockedContextSupplier.getAttemptIdSupplier().get()); + String srcFileName = fileId + "_" + writeToken + "_" + newCommitTime + ".parquet"; + String partitionPath = "2021/09/21"; + String srcPath = basePath + "/" + partitionPath + "/" + srcFileName; + + Map executionConfigs = new HashMap<>(); + executionConfigs.put("timeTaken", 1000L); + executionConfigs.put("prevCommit", prevCommitTime); + writeParquetFile(writeConfig, srcPath, partitionPath, newCommitTime, 50); + ParquetFileMetaToWriteStatusConvertor convertor = + new ParquetFileMetaToWriteStatusConvertor(mockedHoodieTable, writeConfig); + WriteStatus writeStatus = convertor.convert(srcPath, partitionPath, executionConfigs); + Assertions.assertEquals(writeStatus.getFileId(), fileId); + Assertions.assertEquals(writeStatus.getPartitionPath(), partitionPath); + HoodieWriteStat writeStat = writeStatus.getStat(); + Assertions.assertNotNull(writeStat); + Assertions.assertEquals(50, writeStat.getNumWrites()); + Assertions.assertEquals(50, writeStat.getNumInserts()); + Assertions.assertEquals(prevCommitTime, writeStat.getPrevCommit()); + + HoodieWriteStat.RuntimeStats runtimeStats = writeStat.getRuntimeStats(); + Assertions.assertNotNull(runtimeStats); + Assertions.assertEquals(1000, runtimeStats.getTotalCreateTime()); + + } + + /** + * This method creates HoodieWriteConfig with default configuration. + */ + public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, HoodieIndex.IndexType indexType) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .withFinalizeWriteParallelism(2) + .withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withWriteStatusClass(WriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024) + .parquetMaxFileSize(1024 * 1024) + .build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); + } + + /** + * This method writes a parquet file for testing. + */ + public void writeParquetFile(HoodieWriteConfig writeConfig, String fileName, String partitionPath, + String instantTime, int recordsCount) throws IOException { + Schema originalSchema = new Schema.Parser().parse(writeConfig.getSchema()); + Schema hoodieSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(originalSchema); + HoodieAvroParquetWriter fileWriter = + (HoodieAvroParquetWriter) HoodieFileWriterFactory.getFileWriter(instantTime, new Path(fileName), + metaClient.getHadoopConf(), writeConfig, hoodieSchemaWithMetadataFields, mockedContextSupplier, + writeConfig.getRecordMerger().getRecordType()); + + for (int i = 0; i < recordsCount; i++) { + String recordKey = UUID.randomUUID().toString(); + HoodieKey key = new HoodieKey(recordKey, partitionPath.replaceAll("/", "-")); + HoodieRecord record = + new HoodieAvroRecord<>(key, dataGen.generateRandomValue(key, instantTime)); + Option indexedRecord = record.getData().getInsertValue(originalSchema); + IndexedRecord recordWithMetadataInSchema = + HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord.get(), hoodieSchemaWithMetadataFields); + fileWriter.writeAvroWithMetadata(record.getKey(), recordWithMetadataInSchema); + } + fileWriter.close(); + } + +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java new file mode 100644 index 0000000000000..414642335b7c6 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.io.HoodieFileWriteHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * This class gives skeleton implementation for set of clustering execution strategy + * that use parquet-tools commands. + */ +public abstract class ParquetToolsExecutionStrategy> + extends SingleSparkJobExecutionStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetToolsExecutionStrategy.class); + + public ParquetToolsExecutionStrategy( + HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List performClusteringForGroup(ReaderContextFactory readerContextFactory, + ClusteringGroupInfo clusteringOps, + Map strategyParams, + boolean preserveHoodieMetadata, + HoodieSchema schema, + TaskContextSupplier taskContextSupplier, + String instantTime) { + LOG.info("Starting clustering operation on input file ids."); + List clusteringOperations = clusteringOps.getOperations(); + if (clusteringOperations.size() > 1) { + throw new HoodieClusteringException("Expect only one clustering operation during rewrite: " + getClass().getName()); + } + + ClusteringOperation clusteringOperation = clusteringOperations.get(0); + String fileId = clusteringOperation.getFileId(); + String partitionPath = clusteringOperation.getPartitionPath(); + String dataFilePathStr = clusteringOperation.getDataFilePath(); + Path oldFilePath = new Path(dataFilePathStr); + HoodieFileWriteHandle writeHandler = new HoodieFileWriteHandle(getWriteConfig(), instantTime, getHoodieTable(), + partitionPath, fileId, taskContextSupplier, oldFilePath); + + // Executes the parquet-tools command. + executeTools(oldFilePath, writeHandler.getPath()); + return writeHandler.close(); + } + + /** + * This method needs to be overridden by the child classes. + * In this method parquet-tools command can be created and executed. + * Assuming that the parquet-tools command operate per file basis this interface allows command to run once per file. + */ + protected abstract void executeTools(Path oldFilePath, Path newFilePath); + +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java new file mode 100644 index 0000000000000..d526024f67cdf --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.avro.Schema; +import org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.execution.SparkLazyInsertIterable; +import org.apache.hudi.io.SingleFileHandleCreateFactory; +import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class ClusteringIdentityTestExecutionStrategy> + extends SingleSparkJobExecutionStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(ClusteringIdentityTestExecutionStrategy.class); + + public ClusteringIdentityTestExecutionStrategy(HoodieTable table, + HoodieEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + public Iterator> performClusteringWithRecordsIterator(final Iterator> recordItr, final int numOutputGroups, + final String instantTime, + final Map strategyParams, final Schema schema, + final List inputFileIds, final boolean preserveHoodieMetadata, + final TaskContextSupplier taskContextSupplier) { + if (inputFileIds.size() != 1) { + throw new HoodieClusteringException("Expect only one partition and one fileId in clustering group for identity strategy: " + getClass().getName()); + } + + String fileId = inputFileIds.get(0).getFileId(); + return new SparkLazyInsertIterable(recordItr, true, getWriteConfig(), instantTime, getHoodieTable(), + fileId, taskContextSupplier, new SingleFileHandleCreateFactory(fileId, preserveHoodieMetadata)); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java new file mode 100644 index 0000000000000..3e3e98b96dfdd --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.clustering.run.strategy.ParquetToolsExecutionStrategy; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import java.io.IOException; + +/** + * Test execution strategy for testing the skeleton of the ParquetToolsExecutionStrategy. + * It creates a copy of the original file with a different commit timestamp. + */ +public class ParquetToolsTestExecutionStrategy> + extends ParquetToolsExecutionStrategy { + + public ParquetToolsTestExecutionStrategy( + HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected void executeTools(Path oldFilePath, Path newFilePath) { + FileSystem fs = getHoodieTable().getMetaClient().getFs(); + try { + FileUtil.copy(fs, oldFilePath, fs, newFilePath, false, false, fs.getConf()); + } catch (IOException e) { + throw new HoodieIOException("Exception in copying files.", e); + } + } +} From 0df6e53ca3f0f8bace27ce990f9f78331e85e09a Mon Sep 17 00:00:00 2001 From: Surya Prasanna Kumar Yalla Date: Sat, 28 Mar 2026 00:19:08 -0700 Subject: [PATCH 2/4] Refactor --- ...ParquetFileMetaToWriteStatusConvertor.java | 35 ++++++++---------- .../apache/hudi/io/HoodieFileWriteHandle.java | 22 +++++------- ...ParquetFileMetaToWriteStatusConvertor.java | 31 ++++++++-------- .../ParquetToolsExecutionStrategy.java | 9 ++--- ...usteringIdentityTestExecutionStrategy.java | 36 ++++++++++++------- .../ParquetToolsTestExecutionStrategy.java | 4 ++- 6 files changed, 67 insertions(+), 70 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java index 761eb6fab95a0..6fe689e5c2a12 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java @@ -18,25 +18,20 @@ package org.apache.hudi.execution; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; -import org.apache.parquet.format.converter.ParquetMetadataConverter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.List; import java.util.Map; /** @@ -47,14 +42,16 @@ public class ParquetFileMetaToWriteStatusConvertor hoodieTable; private final HoodieWriteConfig writeConfig; - private final FileSystem fs; + private final HoodieStorage storage; + private final ParquetUtils parquetUtils; public static final String PREV_COMMIT = "prevCommit"; public static final String TIME_TAKEN = "timeTaken"; public ParquetFileMetaToWriteStatusConvertor(HoodieTable hoodieTable, HoodieWriteConfig writeConfig) { this.hoodieTable = hoodieTable; this.writeConfig = writeConfig; - this.fs = this.hoodieTable.getMetaClient().getFs(); + this.storage = this.hoodieTable.getStorage(); + this.parquetUtils = new ParquetUtils(); } /** @@ -65,7 +62,7 @@ public WriteStatus convert(String parquetFile, String partitionPath, LOG.info("Creating write status for parquet file " + parquetFile); WriteStatus writeStatus = (WriteStatus) ReflectionUtils.loadClass(this.writeConfig.getWriteStatusClassName(), !this.hoodieTable.getIndex().isImplicitWithStorage(), this.writeConfig.getWriteStatusFailureFraction()); - Path parquetFilePath = new Path(parquetFile); + StoragePath parquetFilePath = new StoragePath(parquetFile); writeStatus.setFileId(FSUtils.getFileId(parquetFilePath.getName())); writeStatus.setPartitionPath(partitionPath); generateHoodieWriteStat(writeStatus, parquetFilePath, executionConfigs); @@ -76,35 +73,31 @@ public WriteStatus convert(String parquetFile, String partitionPath, * This method generates HoodieWriteStat object and set it as part of WriteStatus object. */ private void generateHoodieWriteStat( - WriteStatus writeStatus, Path parquetFilePath, Map executionConfigs) throws IOException { + WriteStatus writeStatus, StoragePath parquetFilePath, Map executionConfigs) throws IOException { HoodieWriteStat stat = new HoodieWriteStat(); // Set row count - ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(this.fs.getConf(), parquetFilePath, - ParquetMetadataConverter.NO_FILTER); - List blockMetaDataList = parquetMetadata.getBlocks(); - long rowCount = blockMetaDataList.stream().mapToLong(BlockMetaData::getRowCount).sum(); + long rowCount = parquetUtils.getRowCount(storage, parquetFilePath); stat.setNumWrites(rowCount); stat.setNumInserts(rowCount); // Set runtime stats HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); - runtimeStats.setTotalCreateTime((long) executionConfigs.get(TIME_TAKEN)); + runtimeStats.setTotalCreateTime(((Number) executionConfigs.get(TIME_TAKEN)).longValue()); stat.setRuntimeStats(runtimeStats); // File size - FileStatus parquetFileStatus = this.fs.getFileStatus(parquetFilePath); - long fileSize = parquetFileStatus.getLen(); + long fileSize = storage.getPathInfo(parquetFilePath).getLength(); stat.setFileSizeInBytes(fileSize); stat.setTotalWriteBytes(fileSize); stat.setFileId(writeStatus.getFileId()); stat.setPartitionPath(writeStatus.getPartitionPath()); - stat.setPath(new Path(writeConfig.getBasePath()), parquetFilePath); + stat.setPath(new StoragePath(writeConfig.getBasePath()), parquetFilePath); stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); stat.setPrevCommit(String.valueOf(executionConfigs.get(PREV_COMMIT))); writeStatus.setStat(stat); } -} \ No newline at end of file +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java index 4ef0f4b9f0888..84de87c84ed12 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java @@ -18,7 +18,6 @@ package org.apache.hudi.io; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -27,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,23 +46,21 @@ public class HoodieFileWriteHandle extends HoodieWriteHandle { private static final Logger LOG = LoggerFactory.getLogger(HoodieFileWriteHandle.class); - private final Path path; - private String prevCommit; + private final StoragePath path; + private final String prevCommit; public HoodieFileWriteHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, - Path oldFilePath) { - super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); + StoragePath oldFilePath) { + super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier, true); // Output file path. this.path = makeNewPath(partitionPath); // Get the prev commit from existing or old file. - this.prevCommit = oldFilePath.getName().split("_")[2].split("\\.")[0]; + this.prevCommit = FSUtils.getCommitTime(oldFilePath.getName()); // Create inProgress marker file - createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension())); - // TODO: HUDI-6416 Create inprogress marker here and remove above marker file creation, once the marker PR is landed. - // createInProgressMarkerFile(partitionPath,FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension())); + createMarkerFile(partitionPath, path.getName()); LOG.info("New HoodieFileWriteHandle for partition :" + partitionPath + " with fileId " + fileId); } @@ -79,10 +77,6 @@ public List close() { executionConfigs.put(TIME_TAKEN, timer.endTimer()); this.writeStatus = generateWriteStatus(path.toString(), partitionPath, executionConfigs); - - // TODO: HUDI-6416 Create completed marker file here once the marker PR is landed. - // createCompleteMarkerFile throws hoodieException, if marker directory is not present. - // createCompletedMarkerFile(partitionPath); LOG.info(String.format("HoodieFileWriteHandle for partitionPath %s fileID %s, took %d ms.", writeStatus.getStat().getPartitionPath(), writeStatus.getStat().getFileId(), writeStatus.getStat().getRuntimeStats().getTotalCreateTime())); @@ -113,7 +107,7 @@ public IOType getIOType() { return IOType.CREATE; } - public Path getPath() { + public StoragePath getPath() { return path; } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java index e0939b10b5a98..5b4baebad4155 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java @@ -22,31 +22,31 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.InProcessTimeGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.io.storage.hadoop.HoodieAvroParquetWriter; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -80,11 +80,11 @@ public void init() throws IOException, NoSuchFieldException { mockedIndex = mock(HoodieIndex.class); when(mockedIndex.isImplicitWithStorage()).thenReturn(true); - Configuration configuration = metaClient.getHadoopConf(); + Configuration configuration = metaClient.getStorageConf().unwrapAs(Configuration.class); mockedHoodieTable = mock(HoodieTable.class); - when(mockedHoodieTable.getHadoopConf()).thenReturn(configuration); when(mockedHoodieTable.getMetaClient()).thenReturn(metaClient); when(mockedHoodieTable.getIndex()).thenReturn(mockedIndex); + when(mockedHoodieTable.getStorage()).thenReturn(metaClient.getStorage()); } @Test @@ -93,8 +93,8 @@ public void testWriteStatusConvertion() throws IOException { getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM) .build(); String fileId = UUID.randomUUID().toString(); - String prevCommitTime = HoodieActiveTimeline.createNewInstantTime(); - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + String prevCommitTime = InProcessTimeGenerator.createNewInstantTime(); + String newCommitTime = InProcessTimeGenerator.createNewInstantTime(); String writeToken = FSUtils.makeWriteToken(mockedContextSupplier.getPartitionIdSupplier().get(), mockedContextSupplier.getStageIdSupplier().get(), mockedContextSupplier.getAttemptIdSupplier().get()); String srcFileName = fileId + "_" + writeToken + "_" + newCommitTime + ".parquet"; @@ -151,21 +151,18 @@ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, HoodieIndex. public void writeParquetFile(HoodieWriteConfig writeConfig, String fileName, String partitionPath, String instantTime, int recordsCount) throws IOException { Schema originalSchema = new Schema.Parser().parse(writeConfig.getSchema()); - Schema hoodieSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(originalSchema); + HoodieSchema hoodieSchemaWithMetadataFields = HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(originalSchema)); HoodieAvroParquetWriter fileWriter = - (HoodieAvroParquetWriter) HoodieFileWriterFactory.getFileWriter(instantTime, new Path(fileName), - metaClient.getHadoopConf(), writeConfig, hoodieSchemaWithMetadataFields, mockedContextSupplier, + (HoodieAvroParquetWriter) HoodieFileWriterFactory.getFileWriter(instantTime, new StoragePath(fileName), + metaClient.getStorage(), writeConfig, hoodieSchemaWithMetadataFields, mockedContextSupplier, writeConfig.getRecordMerger().getRecordType()); for (int i = 0; i < recordsCount; i++) { String recordKey = UUID.randomUUID().toString(); HoodieKey key = new HoodieKey(recordKey, partitionPath.replaceAll("/", "-")); - HoodieRecord record = - new HoodieAvroRecord<>(key, dataGen.generateRandomValue(key, instantTime)); - Option indexedRecord = record.getData().getInsertValue(originalSchema); IndexedRecord recordWithMetadataInSchema = - HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord.get(), hoodieSchemaWithMetadataFields); - fileWriter.writeAvroWithMetadata(record.getKey(), recordWithMetadataInSchema); + HoodieAvroUtils.rewriteRecord((GenericRecord) dataGen.generateRandomValue(key, instantTime), hoodieSchemaWithMetadataFields.toAvroSchema()); + fileWriter.writeAvroWithMetadata(key, recordWithMetadataInSchema); } fileWriter.close(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java index 414642335b7c6..c5f4715cab1ed 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.ReaderContextFactory; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.ClusteringGroupInfo; import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -29,9 +30,9 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.HoodieFileWriteHandle; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,15 +69,15 @@ protected List performClusteringForGroup(ReaderContextFactory re } ClusteringOperation clusteringOperation = clusteringOperations.get(0); - String fileId = clusteringOperation.getFileId(); + String fileId = FSUtils.createNewFileIdPfx(); String partitionPath = clusteringOperation.getPartitionPath(); String dataFilePathStr = clusteringOperation.getDataFilePath(); - Path oldFilePath = new Path(dataFilePathStr); + StoragePath oldFilePath = new StoragePath(dataFilePathStr); HoodieFileWriteHandle writeHandler = new HoodieFileWriteHandle(getWriteConfig(), instantTime, getHoodieTable(), partitionPath, fileId, taskContextSupplier, oldFilePath); // Executes the parquet-tools command. - executeTools(oldFilePath, writeHandler.getPath()); + executeTools(new Path(oldFilePath.toUri()), new Path(writeHandler.getPath().toUri())); return writeHandler.close(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java index d526024f67cdf..4bbac3cc32578 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java @@ -18,22 +18,24 @@ package org.apache.hudi.client; -import org.apache.avro.Schema; import org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.ReaderContextFactory; import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.model.HoodieFileGroupId; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.ClusteringGroupInfo; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.execution.SparkLazyInsertIterable; +import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.SingleFileHandleCreateFactory; import org.apache.hudi.table.HoodieTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -49,17 +51,25 @@ public ClusteringIdentityTestExecutionStrategy(HoodieTable table, } @Override - public Iterator> performClusteringWithRecordsIterator(final Iterator> recordItr, final int numOutputGroups, - final String instantTime, - final Map strategyParams, final Schema schema, - final List inputFileIds, final boolean preserveHoodieMetadata, - final TaskContextSupplier taskContextSupplier) { - if (inputFileIds.size() != 1) { + protected List performClusteringForGroup(ReaderContextFactory readerContextFactory, + ClusteringGroupInfo clusteringGroup, + Map strategyParams, + boolean preserveHoodieMetadata, + HoodieSchema schema, + TaskContextSupplier taskContextSupplier, + String instantTime) { + if (clusteringGroup.getOperations().size() != 1) { throw new HoodieClusteringException("Expect only one partition and one fileId in clustering group for identity strategy: " + getClass().getName()); } - String fileId = inputFileIds.get(0).getFileId(); - return new SparkLazyInsertIterable(recordItr, true, getWriteConfig(), instantTime, getHoodieTable(), - fileId, taskContextSupplier, new SingleFileHandleCreateFactory(fileId, preserveHoodieMetadata)); + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, getWriteConfig()); + String fileId = clusteringGroup.getOperations().get(0).getFileId(); + try (ClosableIterator recordItr = + getRecordIterator(readerContextFactory, clusteringGroup.getOperations().get(0), instantTime, maxMemoryPerCompaction)) { + SparkLazyInsertIterable insertIterable = + new SparkLazyInsertIterable<>(recordItr, true, getWriteConfig(), instantTime, getHoodieTable(), + fileId, taskContextSupplier, new SingleFileHandleCreateFactory(fileId, preserveHoodieMetadata)); + return insertIterable.hasNext() ? insertIterable.next() : Collections.emptyList(); + } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java index 3e3e98b96dfdd..e535a8bfdf1a1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -44,8 +45,9 @@ public ParquetToolsTestExecutionStrategy( @Override protected void executeTools(Path oldFilePath, Path newFilePath) { - FileSystem fs = getHoodieTable().getMetaClient().getFs(); try { + Configuration hadoopConf = getHoodieTable().getStorageConf().unwrapAs(Configuration.class); + FileSystem fs = oldFilePath.getFileSystem(hadoopConf); FileUtil.copy(fs, oldFilePath, fs, newFilePath, false, false, fs.getConf()); } catch (IOException e) { throw new HoodieIOException("Exception in copying files.", e); From e0c634361d884ded0fc71bf51995439d1b0e354a Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 13 Apr 2026 17:38:39 -0700 Subject: [PATCH 3/4] Addressing feedback --- ... => FileMetadataWriteStatusConverter.java} | 11 +++--- ...=> ExternalFileClusteringWriteHandle.java} | 30 ++++++++-------- ...TestFileMetadataWriteStatusConverter.java} | 14 ++++---- ...ernalFileClusteringExecutionStrategy.java} | 35 ++++++++++++------- ...lFileClusteringTestExecutionStrategy.java} | 18 +++++----- 5 files changed, 59 insertions(+), 49 deletions(-) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/{ParquetFileMetaToWriteStatusConvertor.java => FileMetadataWriteStatusConverter.java} (87%) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/{HoodieFileWriteHandle.java => ExternalFileClusteringWriteHandle.java} (72%) rename hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/{TestParquetFileMetaToWriteStatusConvertor.java => TestFileMetadataWriteStatusConverter.java} (93%) rename hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/{ParquetToolsExecutionStrategy.java => SparkExternalFileClusteringExecutionStrategy.java} (70%) rename hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/{ParquetToolsTestExecutionStrategy.java => ExternalFileClusteringTestExecutionStrategy.java} (70%) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java similarity index 87% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java index 6fe689e5c2a12..26de170a7a16f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/ParquetFileMetaToWriteStatusConvertor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java @@ -35,19 +35,19 @@ import java.util.Map; /** - * This class is mainly used by the ParquetToolsExecutionStrategy to generate WriteStatus classes. + * This class is mainly used by the SparkExternalFileClusteringExecutionStrategy to generate WriteStatus classes. */ -public class ParquetFileMetaToWriteStatusConvertor { +public class FileMetadataWriteStatusConverter { - private static final Logger LOG = LoggerFactory.getLogger(ParquetFileMetaToWriteStatusConvertor.class); + private static final Logger LOG = LoggerFactory.getLogger(FileMetadataWriteStatusConverter.class); private final HoodieTable hoodieTable; private final HoodieWriteConfig writeConfig; private final HoodieStorage storage; private final ParquetUtils parquetUtils; public static final String PREV_COMMIT = "prevCommit"; - public static final String TIME_TAKEN = "timeTaken"; + public static final String TIME_TAKEN = "totalCreateTime"; - public ParquetFileMetaToWriteStatusConvertor(HoodieTable hoodieTable, HoodieWriteConfig writeConfig) { + public FileMetadataWriteStatusConverter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig) { this.hoodieTable = hoodieTable; this.writeConfig = writeConfig; this.storage = this.hoodieTable.getStorage(); @@ -94,7 +94,6 @@ private void generateHoodieWriteStat( stat.setFileId(writeStatus.getFileId()); stat.setPartitionPath(writeStatus.getPartitionPath()); stat.setPath(new StoragePath(writeConfig.getBasePath()), parquetFilePath); - stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); stat.setPrevCommit(String.valueOf(executionConfigs.get(PREV_COMMIT))); writeStatus.setStat(stat); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java similarity index 72% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java index 84de87c84ed12..39f705fddaaf1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieFileWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java @@ -24,8 +24,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.IOType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieInsertException; -import org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor; +import org.apache.hudi.execution.FileMetadataWriteStatusConverter; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.slf4j.Logger; @@ -37,19 +38,19 @@ import java.util.List; import java.util.Map; -import static org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor.PREV_COMMIT; -import static org.apache.hudi.execution.ParquetFileMetaToWriteStatusConvertor.TIME_TAKEN; +import static org.apache.hudi.execution.FileMetadataWriteStatusConverter.PREV_COMMIT; +import static org.apache.hudi.execution.FileMetadataWriteStatusConverter.TIME_TAKEN; /** * Write handle that is used to work on top of files rather than on individual records. */ -public class HoodieFileWriteHandle extends HoodieWriteHandle { +public class ExternalFileClusteringWriteHandle extends HoodieWriteHandle { - private static final Logger LOG = LoggerFactory.getLogger(HoodieFileWriteHandle.class); + private static final Logger LOG = LoggerFactory.getLogger(ExternalFileClusteringWriteHandle.class); private final StoragePath path; private final String prevCommit; - public HoodieFileWriteHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + public ExternalFileClusteringWriteHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, StoragePath oldFilePath) { super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier, true); @@ -61,29 +62,32 @@ public HoodieFileWriteHandle(HoodieWriteConfig config, String instantTime, Hoodi // Create inProgress marker file createMarkerFile(partitionPath, path.getName()); - LOG.info("New HoodieFileWriteHandle for partition :" + partitionPath + " with fileId " + fileId); + LOG.info("New ExternalFileClusteringWriteHandle for partition :" + partitionPath + " with fileId " + fileId); } /** - * Complete writing of the file by creating the success marker file. + * Complete writing of the file. * @return WriteStatuses, ideally it will be only one object. */ @Override public List close() { - LOG.info("Closing the file " + this.fileId + " as we are done with the file."); try { + if (!hoodieTable.getStorage().exists(path)) { + throw new HoodieIOException("Output file does not exist, transformation may not have been invoked: " + path); + } + Map executionConfigs = new HashMap<>(); executionConfigs.put(PREV_COMMIT, prevCommit); executionConfigs.put(TIME_TAKEN, timer.endTimer()); this.writeStatus = generateWriteStatus(path.toString(), partitionPath, executionConfigs); - LOG.info(String.format("HoodieFileWriteHandle for partitionPath %s fileID %s, took %d ms.", + LOG.info(String.format("ExternalFileClusteringWriteHandle for partitionPath %s fileID %s, took %d ms.", writeStatus.getStat().getPartitionPath(), writeStatus.getStat().getFileId(), writeStatus.getStat().getRuntimeStats().getTotalCreateTime())); return Collections.singletonList(writeStatus); } catch (IOException e) { - throw new HoodieInsertException("Failed to close the HoodieFileWriteHandle for path " + path, e); + throw new HoodieInsertException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); } } @@ -97,9 +101,7 @@ public List close() { */ protected WriteStatus generateWriteStatus( String outputFile, String partitionPath, Map executionConfigs) throws IOException { - ParquetFileMetaToWriteStatusConvertor convertor = - new ParquetFileMetaToWriteStatusConvertor(hoodieTable, config); - return convertor.convert(outputFile, partitionPath, executionConfigs); + return new FileMetadataWriteStatusConverter(hoodieTable, config).convert(outputFile, partitionPath, executionConfigs); } @Override diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestFileMetadataWriteStatusConverter.java similarity index 93% rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java rename to hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestFileMetadataWriteStatusConverter.java index 5b4baebad4155..6aa3740f259f6 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestParquetFileMetaToWriteStatusConvertor.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestFileMetadataWriteStatusConverter.java @@ -32,14 +32,12 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaUtils; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.InProcessTimeGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -60,7 +58,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TestParquetFileMetaToWriteStatusConvertor extends HoodieCommonTestHarness { +public class TestFileMetadataWriteStatusConverter extends HoodieCommonTestHarness { private TaskContextSupplier mockedContextSupplier; private HoodieTable mockedHoodieTable; @@ -88,7 +86,7 @@ public void init() throws IOException, NoSuchFieldException { } @Test - public void testWriteStatusConvertion() throws IOException { + public void testWriteStatusConversion() throws IOException { HoodieWriteConfig writeConfig = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM) .build(); @@ -102,12 +100,12 @@ public void testWriteStatusConvertion() throws IOException { String srcPath = basePath + "/" + partitionPath + "/" + srcFileName; Map executionConfigs = new HashMap<>(); - executionConfigs.put("timeTaken", 1000L); + executionConfigs.put("totalCreateTime", 1000L); executionConfigs.put("prevCommit", prevCommitTime); writeParquetFile(writeConfig, srcPath, partitionPath, newCommitTime, 50); - ParquetFileMetaToWriteStatusConvertor convertor = - new ParquetFileMetaToWriteStatusConvertor(mockedHoodieTable, writeConfig); - WriteStatus writeStatus = convertor.convert(srcPath, partitionPath, executionConfigs); + FileMetadataWriteStatusConverter converter = + new FileMetadataWriteStatusConverter(mockedHoodieTable, writeConfig); + WriteStatus writeStatus = converter.convert(srcPath, partitionPath, executionConfigs); Assertions.assertEquals(writeStatus.getFileId(), fileId); Assertions.assertEquals(writeStatus.getPartitionPath(), partitionPath); HoodieWriteStat writeStat = writeStatus.getStat(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java similarity index 70% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java index c5f4715cab1ed..0700c05664519 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ParquetToolsExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java @@ -29,11 +29,10 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; -import org.apache.hudi.io.HoodieFileWriteHandle; +import org.apache.hudi.io.ExternalFileClusteringWriteHandle; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,14 +41,14 @@ /** * This class gives skeleton implementation for set of clustering execution strategy - * that use parquet-tools commands. + * that use external file transformation commands. */ -public abstract class ParquetToolsExecutionStrategy> +public abstract class SparkExternalFileClusteringExecutionStrategy> extends SingleSparkJobExecutionStrategy { - private static final Logger LOG = LoggerFactory.getLogger(ParquetToolsExecutionStrategy.class); + private static final Logger LOG = LoggerFactory.getLogger(SparkExternalFileClusteringExecutionStrategy.class); - public ParquetToolsExecutionStrategy( + public SparkExternalFileClusteringExecutionStrategy( HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { super(table, engineContext, writeConfig); } @@ -64,7 +63,7 @@ protected List performClusteringForGroup(ReaderContextFactory re String instantTime) { LOG.info("Starting clustering operation on input file ids."); List clusteringOperations = clusteringOps.getOperations(); - if (clusteringOperations.size() > 1) { + if (clusteringOperations.size() != 1) { throw new HoodieClusteringException("Expect only one clustering operation during rewrite: " + getClass().getName()); } @@ -73,19 +72,29 @@ protected List performClusteringForGroup(ReaderContextFactory re String partitionPath = clusteringOperation.getPartitionPath(); String dataFilePathStr = clusteringOperation.getDataFilePath(); StoragePath oldFilePath = new StoragePath(dataFilePathStr); - HoodieFileWriteHandle writeHandler = new HoodieFileWriteHandle(getWriteConfig(), instantTime, getHoodieTable(), + ExternalFileClusteringWriteHandle writeHandler = new ExternalFileClusteringWriteHandle(getWriteConfig(), instantTime, getHoodieTable(), partitionPath, fileId, taskContextSupplier, oldFilePath); - // Executes the parquet-tools command. - executeTools(new Path(oldFilePath.toUri()), new Path(writeHandler.getPath().toUri())); + try { + // Executes the file transformation. + transformFile(oldFilePath, writeHandler.getPath()); + } catch (Exception e) { + // Clean up partial output file if transformation fails. + try { + getHoodieTable().getStorage().deleteFile(writeHandler.getPath()); + } catch (Exception deleteEx) { + LOG.warn("Failed to clean up partial output file: " + writeHandler.getPath(), deleteEx); + } + throw new HoodieClusteringException("Failed to transform file: " + dataFilePathStr, e); + } return writeHandler.close(); } /** * This method needs to be overridden by the child classes. - * In this method parquet-tools command can be created and executed. - * Assuming that the parquet-tools command operate per file basis this interface allows command to run once per file. + * In this method external file transformation can be created and executed. + * Assuming that the transformation operates per file basis, this interface allows the command to run once per file. */ - protected abstract void executeTools(Path oldFilePath, Path newFilePath); + protected abstract void transformFile(StoragePath oldFilePath, StoragePath newFilePath); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ExternalFileClusteringTestExecutionStrategy.java similarity index 70% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ExternalFileClusteringTestExecutionStrategy.java index e535a8bfdf1a1..2210ee6fc673e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ParquetToolsTestExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ExternalFileClusteringTestExecutionStrategy.java @@ -22,33 +22,35 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hudi.client.clustering.run.strategy.ParquetToolsExecutionStrategy; +import org.apache.hudi.client.clustering.run.strategy.SparkExternalFileClusteringExecutionStrategy; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import java.io.IOException; /** - * Test execution strategy for testing the skeleton of the ParquetToolsExecutionStrategy. + * Test execution strategy for testing the skeleton of the SparkExternalFileClusteringExecutionStrategy. * It creates a copy of the original file with a different commit timestamp. */ -public class ParquetToolsTestExecutionStrategy> - extends ParquetToolsExecutionStrategy { +public class ExternalFileClusteringTestExecutionStrategy> + extends SparkExternalFileClusteringExecutionStrategy { - public ParquetToolsTestExecutionStrategy( + public ExternalFileClusteringTestExecutionStrategy( HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { super(table, engineContext, writeConfig); } @Override - protected void executeTools(Path oldFilePath, Path newFilePath) { + protected void transformFile(StoragePath oldFilePath, StoragePath newFilePath) { try { Configuration hadoopConf = getHoodieTable().getStorageConf().unwrapAs(Configuration.class); - FileSystem fs = oldFilePath.getFileSystem(hadoopConf); - FileUtil.copy(fs, oldFilePath, fs, newFilePath, false, false, fs.getConf()); + Path srcPath = new Path(oldFilePath.toUri()); + FileSystem fs = srcPath.getFileSystem(hadoopConf); + FileUtil.copy(fs, srcPath, fs, new Path(newFilePath.toUri()), false, false, fs.getConf()); } catch (IOException e) { throw new HoodieIOException("Exception in copying files.", e); } From 6d5c8b493431ae9f35aca5f977ba255100717ca5 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 15 Apr 2026 14:53:22 -0700 Subject: [PATCH 4/4] Fixing last few feedback --- .../hudi/execution/FileMetadataWriteStatusConverter.java | 7 +++++-- .../apache/hudi/io/ExternalFileClusteringWriteHandle.java | 8 +++++--- .../SparkExternalFileClusteringExecutionStrategy.java | 4 ++++ 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java index 26de170a7a16f..51ac55cd5dc4d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; @@ -61,7 +62,7 @@ public WriteStatus convert(String parquetFile, String partitionPath, Map executionConfigs) throws IOException { LOG.info("Creating write status for parquet file " + parquetFile); WriteStatus writeStatus = (WriteStatus) ReflectionUtils.loadClass(this.writeConfig.getWriteStatusClassName(), - !this.hoodieTable.getIndex().isImplicitWithStorage(), this.writeConfig.getWriteStatusFailureFraction()); + this.hoodieTable.shouldTrackSuccessRecords(), this.writeConfig.getWriteStatusFailureFraction(), this.hoodieTable.isMetadataTable()); StoragePath parquetFilePath = new StoragePath(parquetFile); writeStatus.setFileId(FSUtils.getFileId(parquetFilePath.getName())); writeStatus.setPartitionPath(partitionPath); @@ -94,7 +95,9 @@ private void generateHoodieWriteStat( stat.setFileId(writeStatus.getFileId()); stat.setPartitionPath(writeStatus.getPartitionPath()); stat.setPath(new StoragePath(writeConfig.getBasePath()), parquetFilePath); - stat.setPrevCommit(String.valueOf(executionConfigs.get(PREV_COMMIT))); + Object prevCommit = executionConfigs.get(PREV_COMMIT); + ValidationUtils.checkArgument(prevCommit != null, "prevCommit must be set in executionConfigs"); + stat.setPrevCommit(prevCommit.toString()); writeStatus.setStat(stat); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java index 39f705fddaaf1..9c9a5a3f0ba1b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.IOType; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.execution.FileMetadataWriteStatusConverter; import org.apache.hudi.storage.StoragePath; @@ -73,7 +73,7 @@ public ExternalFileClusteringWriteHandle(HoodieWriteConfig config, String instan public List close() { try { if (!hoodieTable.getStorage().exists(path)) { - throw new HoodieIOException("Output file does not exist, transformation may not have been invoked: " + path); + throw new HoodieClusteringException("Output file does not exist, transformation may not have been invoked: " + path); } Map executionConfigs = new HashMap<>(); @@ -88,6 +88,8 @@ public List close() { return Collections.singletonList(writeStatus); } catch (IOException e) { throw new HoodieInsertException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); + } finally { + markClosed(); } } @@ -101,7 +103,7 @@ public List close() { */ protected WriteStatus generateWriteStatus( String outputFile, String partitionPath, Map executionConfigs) throws IOException { - return new FileMetadataWriteStatusConverter(hoodieTable, config).convert(outputFile, partitionPath, executionConfigs); + return new FileMetadataWriteStatusConverter<>(hoodieTable, config).convert(outputFile, partitionPath, executionConfigs); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java index 0700c05664519..be5c8aaa01ef5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java @@ -62,6 +62,10 @@ protected List performClusteringForGroup(ReaderContextFactory re TaskContextSupplier taskContextSupplier, String instantTime) { LOG.info("Starting clustering operation on input file ids."); + if (!preserveHoodieMetadata) { + throw new HoodieClusteringException( + "External file clustering strategy cannot rewrite Hudi metadata fields. preserveHoodieMetadata must be true."); + } List clusteringOperations = clusteringOps.getOperations(); if (clusteringOperations.size() != 1) { throw new HoodieClusteringException("Expect only one clustering operation during rewrite: " + getClass().getName());