diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java new file mode 100644 index 0000000000000..51ac55cd5dc4d --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/FileMetadataWriteStatusConverter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +/** + * This class is mainly used by the SparkExternalFileClusteringExecutionStrategy to generate WriteStatus classes. + */ +public class FileMetadataWriteStatusConverter { + + private static final Logger LOG = LoggerFactory.getLogger(FileMetadataWriteStatusConverter.class); + private final HoodieTable hoodieTable; + private final HoodieWriteConfig writeConfig; + private final HoodieStorage storage; + private final ParquetUtils parquetUtils; + public static final String PREV_COMMIT = "prevCommit"; + public static final String TIME_TAKEN = "totalCreateTime"; + + public FileMetadataWriteStatusConverter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig) { + this.hoodieTable = hoodieTable; + this.writeConfig = writeConfig; + this.storage = this.hoodieTable.getStorage(); + this.parquetUtils = new ParquetUtils(); + } + + /** + * This method generates writeStatus object from parquet file. + */ + public WriteStatus convert(String parquetFile, String partitionPath, + Map executionConfigs) throws IOException { + LOG.info("Creating write status for parquet file " + parquetFile); + WriteStatus writeStatus = (WriteStatus) ReflectionUtils.loadClass(this.writeConfig.getWriteStatusClassName(), + this.hoodieTable.shouldTrackSuccessRecords(), this.writeConfig.getWriteStatusFailureFraction(), this.hoodieTable.isMetadataTable()); + StoragePath parquetFilePath = new StoragePath(parquetFile); + writeStatus.setFileId(FSUtils.getFileId(parquetFilePath.getName())); + writeStatus.setPartitionPath(partitionPath); + generateHoodieWriteStat(writeStatus, parquetFilePath, executionConfigs); + return writeStatus; + } + + /** + * This method generates HoodieWriteStat object and set it as part of WriteStatus object. + */ + private void generateHoodieWriteStat( + WriteStatus writeStatus, StoragePath parquetFilePath, Map executionConfigs) throws IOException { + HoodieWriteStat stat = new HoodieWriteStat(); + + // Set row count + long rowCount = parquetUtils.getRowCount(storage, parquetFilePath); + stat.setNumWrites(rowCount); + stat.setNumInserts(rowCount); + + // Set runtime stats + HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); + runtimeStats.setTotalCreateTime(((Number) executionConfigs.get(TIME_TAKEN)).longValue()); + stat.setRuntimeStats(runtimeStats); + + // File size + long fileSize = storage.getPathInfo(parquetFilePath).getLength(); + stat.setFileSizeInBytes(fileSize); + stat.setTotalWriteBytes(fileSize); + + stat.setFileId(writeStatus.getFileId()); + stat.setPartitionPath(writeStatus.getPartitionPath()); + stat.setPath(new StoragePath(writeConfig.getBasePath()), parquetFilePath); + Object prevCommit = executionConfigs.get(PREV_COMMIT); + ValidationUtils.checkArgument(prevCommit != null, "prevCommit must be set in executionConfigs"); + stat.setPrevCommit(prevCommit.toString()); + + writeStatus.setStat(stat); + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java new file mode 100644 index 0000000000000..9c9a5a3f0ba1b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/ExternalFileClusteringWriteHandle.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.execution.FileMetadataWriteStatusConverter; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.execution.FileMetadataWriteStatusConverter.PREV_COMMIT; +import static org.apache.hudi.execution.FileMetadataWriteStatusConverter.TIME_TAKEN; + +/** + * Write handle that is used to work on top of files rather than on individual records. + */ +public class ExternalFileClusteringWriteHandle extends HoodieWriteHandle { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalFileClusteringWriteHandle.class); + private final StoragePath path; + private final String prevCommit; + + public ExternalFileClusteringWriteHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, + StoragePath oldFilePath) { + super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier, true); + + // Output file path. + this.path = makeNewPath(partitionPath); + // Get the prev commit from existing or old file. + this.prevCommit = FSUtils.getCommitTime(oldFilePath.getName()); + + // Create inProgress marker file + createMarkerFile(partitionPath, path.getName()); + LOG.info("New ExternalFileClusteringWriteHandle for partition :" + partitionPath + " with fileId " + fileId); + } + + /** + * Complete writing of the file. + * @return WriteStatuses, ideally it will be only one object. + */ + @Override + public List close() { + try { + if (!hoodieTable.getStorage().exists(path)) { + throw new HoodieClusteringException("Output file does not exist, transformation may not have been invoked: " + path); + } + + Map executionConfigs = new HashMap<>(); + executionConfigs.put(PREV_COMMIT, prevCommit); + executionConfigs.put(TIME_TAKEN, timer.endTimer()); + + this.writeStatus = generateWriteStatus(path.toString(), partitionPath, executionConfigs); + LOG.info(String.format("ExternalFileClusteringWriteHandle for partitionPath %s fileID %s, took %d ms.", + writeStatus.getStat().getPartitionPath(), writeStatus.getStat().getFileId(), + writeStatus.getStat().getRuntimeStats().getTotalCreateTime())); + + return Collections.singletonList(writeStatus); + } catch (IOException e) { + throw new HoodieInsertException("Failed to close the ExternalFileClusteringWriteHandle for path " + path, e); + } finally { + markClosed(); + } + } + + /** + * Given a parquet file it generates WriteStatus object for a parquet file. + * @param outputFile parquet file + * @param partitionPath partition information of the parquet file + * @param executionConfigs Some configs collected during execution. + * @return WriteStatus object. + * @throws IOException + */ + protected WriteStatus generateWriteStatus( + String outputFile, String partitionPath, Map executionConfigs) throws IOException { + return new FileMetadataWriteStatusConverter<>(hoodieTable, config).convert(outputFile, partitionPath, executionConfigs); + } + + @Override + public IOType getIOType() { + return IOType.CREATE; + } + + public StoragePath getPath() { + return path; + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestFileMetadataWriteStatusConverter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestFileMetadataWriteStatusConverter.java new file mode 100644 index 0000000000000..6aa3740f259f6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/execution/TestFileMetadataWriteStatusConverter.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.InProcessTimeGenerator; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.io.storage.hadoop.HoodieAvroParquetWriter; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFileMetadataWriteStatusConverter extends HoodieCommonTestHarness { + + private TaskContextSupplier mockedContextSupplier; + private HoodieTable mockedHoodieTable; + private HoodieIndex mockedIndex; + + @BeforeEach + public void init() throws IOException, NoSuchFieldException { + initPath(); + initTestDataGenerator(); + initMetaClient(); + + mockedContextSupplier = mock(TaskContextSupplier.class); + when(mockedContextSupplier.getAttemptIdSupplier()).thenReturn(() -> 1L); + when(mockedContextSupplier.getStageIdSupplier()).thenReturn(() -> 1); + when(mockedContextSupplier.getPartitionIdSupplier()).thenReturn(() -> 1); + + mockedIndex = mock(HoodieIndex.class); + when(mockedIndex.isImplicitWithStorage()).thenReturn(true); + + Configuration configuration = metaClient.getStorageConf().unwrapAs(Configuration.class); + mockedHoodieTable = mock(HoodieTable.class); + when(mockedHoodieTable.getMetaClient()).thenReturn(metaClient); + when(mockedHoodieTable.getIndex()).thenReturn(mockedIndex); + when(mockedHoodieTable.getStorage()).thenReturn(metaClient.getStorage()); + } + + @Test + public void testWriteStatusConversion() throws IOException { + HoodieWriteConfig writeConfig = + getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM) + .build(); + String fileId = UUID.randomUUID().toString(); + String prevCommitTime = InProcessTimeGenerator.createNewInstantTime(); + String newCommitTime = InProcessTimeGenerator.createNewInstantTime(); + String writeToken = FSUtils.makeWriteToken(mockedContextSupplier.getPartitionIdSupplier().get(), + mockedContextSupplier.getStageIdSupplier().get(), mockedContextSupplier.getAttemptIdSupplier().get()); + String srcFileName = fileId + "_" + writeToken + "_" + newCommitTime + ".parquet"; + String partitionPath = "2021/09/21"; + String srcPath = basePath + "/" + partitionPath + "/" + srcFileName; + + Map executionConfigs = new HashMap<>(); + executionConfigs.put("totalCreateTime", 1000L); + executionConfigs.put("prevCommit", prevCommitTime); + writeParquetFile(writeConfig, srcPath, partitionPath, newCommitTime, 50); + FileMetadataWriteStatusConverter converter = + new FileMetadataWriteStatusConverter(mockedHoodieTable, writeConfig); + WriteStatus writeStatus = converter.convert(srcPath, partitionPath, executionConfigs); + Assertions.assertEquals(writeStatus.getFileId(), fileId); + Assertions.assertEquals(writeStatus.getPartitionPath(), partitionPath); + HoodieWriteStat writeStat = writeStatus.getStat(); + Assertions.assertNotNull(writeStat); + Assertions.assertEquals(50, writeStat.getNumWrites()); + Assertions.assertEquals(50, writeStat.getNumInserts()); + Assertions.assertEquals(prevCommitTime, writeStat.getPrevCommit()); + + HoodieWriteStat.RuntimeStats runtimeStats = writeStat.getRuntimeStats(); + Assertions.assertNotNull(runtimeStats); + Assertions.assertEquals(1000, runtimeStats.getTotalCreateTime()); + + } + + /** + * This method creates HoodieWriteConfig with default configuration. + */ + public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, HoodieIndex.IndexType indexType) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .withFinalizeWriteParallelism(2) + .withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withWriteStatusClass(WriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024) + .parquetMaxFileSize(1024 * 1024) + .build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); + } + + /** + * This method writes a parquet file for testing. + */ + public void writeParquetFile(HoodieWriteConfig writeConfig, String fileName, String partitionPath, + String instantTime, int recordsCount) throws IOException { + Schema originalSchema = new Schema.Parser().parse(writeConfig.getSchema()); + HoodieSchema hoodieSchemaWithMetadataFields = HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(originalSchema)); + HoodieAvroParquetWriter fileWriter = + (HoodieAvroParquetWriter) HoodieFileWriterFactory.getFileWriter(instantTime, new StoragePath(fileName), + metaClient.getStorage(), writeConfig, hoodieSchemaWithMetadataFields, mockedContextSupplier, + writeConfig.getRecordMerger().getRecordType()); + + for (int i = 0; i < recordsCount; i++) { + String recordKey = UUID.randomUUID().toString(); + HoodieKey key = new HoodieKey(recordKey, partitionPath.replaceAll("/", "-")); + IndexedRecord recordWithMetadataInSchema = + HoodieAvroUtils.rewriteRecord((GenericRecord) dataGen.generateRandomValue(key, instantTime), hoodieSchemaWithMetadataFields.toAvroSchema()); + fileWriter.writeAvroWithMetadata(key, recordWithMetadataInSchema); + } + fileWriter.close(); + } + +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java new file mode 100644 index 0000000000000..be5c8aaa01ef5 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkExternalFileClusteringExecutionStrategy.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.io.ExternalFileClusteringWriteHandle; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * This class gives skeleton implementation for set of clustering execution strategy + * that use external file transformation commands. + */ +public abstract class SparkExternalFileClusteringExecutionStrategy> + extends SingleSparkJobExecutionStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(SparkExternalFileClusteringExecutionStrategy.class); + + public SparkExternalFileClusteringExecutionStrategy( + HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List performClusteringForGroup(ReaderContextFactory readerContextFactory, + ClusteringGroupInfo clusteringOps, + Map strategyParams, + boolean preserveHoodieMetadata, + HoodieSchema schema, + TaskContextSupplier taskContextSupplier, + String instantTime) { + LOG.info("Starting clustering operation on input file ids."); + if (!preserveHoodieMetadata) { + throw new HoodieClusteringException( + "External file clustering strategy cannot rewrite Hudi metadata fields. preserveHoodieMetadata must be true."); + } + List clusteringOperations = clusteringOps.getOperations(); + if (clusteringOperations.size() != 1) { + throw new HoodieClusteringException("Expect only one clustering operation during rewrite: " + getClass().getName()); + } + + ClusteringOperation clusteringOperation = clusteringOperations.get(0); + String fileId = FSUtils.createNewFileIdPfx(); + String partitionPath = clusteringOperation.getPartitionPath(); + String dataFilePathStr = clusteringOperation.getDataFilePath(); + StoragePath oldFilePath = new StoragePath(dataFilePathStr); + ExternalFileClusteringWriteHandle writeHandler = new ExternalFileClusteringWriteHandle(getWriteConfig(), instantTime, getHoodieTable(), + partitionPath, fileId, taskContextSupplier, oldFilePath); + + try { + // Executes the file transformation. + transformFile(oldFilePath, writeHandler.getPath()); + } catch (Exception e) { + // Clean up partial output file if transformation fails. + try { + getHoodieTable().getStorage().deleteFile(writeHandler.getPath()); + } catch (Exception deleteEx) { + LOG.warn("Failed to clean up partial output file: " + writeHandler.getPath(), deleteEx); + } + throw new HoodieClusteringException("Failed to transform file: " + dataFilePathStr, e); + } + return writeHandler.close(); + } + + /** + * This method needs to be overridden by the child classes. + * In this method external file transformation can be created and executed. + * Assuming that the transformation operates per file basis, this interface allows the command to run once per file. + */ + protected abstract void transformFile(StoragePath oldFilePath, StoragePath newFilePath); + +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java new file mode 100644 index 0000000000000..4bbac3cc32578 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ClusteringIdentityTestExecutionStrategy.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.execution.SparkLazyInsertIterable; +import org.apache.hudi.io.IOUtils; +import org.apache.hudi.io.SingleFileHandleCreateFactory; +import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ClusteringIdentityTestExecutionStrategy> + extends SingleSparkJobExecutionStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(ClusteringIdentityTestExecutionStrategy.class); + + public ClusteringIdentityTestExecutionStrategy(HoodieTable table, + HoodieEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List performClusteringForGroup(ReaderContextFactory readerContextFactory, + ClusteringGroupInfo clusteringGroup, + Map strategyParams, + boolean preserveHoodieMetadata, + HoodieSchema schema, + TaskContextSupplier taskContextSupplier, + String instantTime) { + if (clusteringGroup.getOperations().size() != 1) { + throw new HoodieClusteringException("Expect only one partition and one fileId in clustering group for identity strategy: " + getClass().getName()); + } + + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, getWriteConfig()); + String fileId = clusteringGroup.getOperations().get(0).getFileId(); + try (ClosableIterator recordItr = + getRecordIterator(readerContextFactory, clusteringGroup.getOperations().get(0), instantTime, maxMemoryPerCompaction)) { + SparkLazyInsertIterable insertIterable = + new SparkLazyInsertIterable<>(recordItr, true, getWriteConfig(), instantTime, getHoodieTable(), + fileId, taskContextSupplier, new SingleFileHandleCreateFactory(fileId, preserveHoodieMetadata)); + return insertIterable.hasNext() ? insertIterable.next() : Collections.emptyList(); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ExternalFileClusteringTestExecutionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ExternalFileClusteringTestExecutionStrategy.java new file mode 100644 index 0000000000000..2210ee6fc673e --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/ExternalFileClusteringTestExecutionStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.clustering.run.strategy.SparkExternalFileClusteringExecutionStrategy; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; + +import java.io.IOException; + +/** + * Test execution strategy for testing the skeleton of the SparkExternalFileClusteringExecutionStrategy. + * It creates a copy of the original file with a different commit timestamp. + */ +public class ExternalFileClusteringTestExecutionStrategy> + extends SparkExternalFileClusteringExecutionStrategy { + + public ExternalFileClusteringTestExecutionStrategy( + HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected void transformFile(StoragePath oldFilePath, StoragePath newFilePath) { + try { + Configuration hadoopConf = getHoodieTable().getStorageConf().unwrapAs(Configuration.class); + Path srcPath = new Path(oldFilePath.toUri()); + FileSystem fs = srcPath.getFileSystem(hadoopConf); + FileUtil.copy(fs, srcPath, fs, new Path(newFilePath.toUri()), false, false, fs.getConf()); + } catch (IOException e) { + throw new HoodieIOException("Exception in copying files.", e); + } + } +}