diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 15838520edcc0..8866f58c59916 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -237,13 +237,13 @@ public List getEnabledPartitionTypes() { protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, Option inflightInstantTimestamp) throws IOException { HoodieTimer timer = HoodieTimer.start(); - List partitionsToInit = new ArrayList<>(MetadataPartitionType.values().length); + List metadataPartitionsToInit = new ArrayList<>(MetadataPartitionType.values().length); try { boolean exists = metadataTableExists(dataMetaClient); if (!exists) { // FILES partition is always required - partitionsToInit.add(MetadataPartitionType.FILES); + metadataPartitionsToInit.add(MetadataPartitionType.FILES); } // check if any of the enabled partition types needs to be initialized @@ -253,10 +253,10 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, LOG.info("Async metadata indexing disabled and following partitions already initialized: " + completedPartitions); this.enabledPartitionTypes.stream() .filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) - .forEach(partitionsToInit::add); + .forEach(metadataPartitionsToInit::add); } - if (partitionsToInit.isEmpty()) { + if (metadataPartitionsToInit.isEmpty()) { // No partitions left to initialize, since all the metadata enabled partitions are either initialized before // or current in the process of initialization. initMetadataReader(); @@ -266,9 +266,7 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit // Otherwise, we use the timestamp of the latest completed action. String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - - // Initialize partitions for the first time using data from the files on the file system - if (!initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp)) { + if (!initializeFromFilesystem(initializationTime, metadataPartitionsToInit, inflightInstantTimestamp)) { LOG.error("Failed to initialize MDT from filesystem"); return false; } @@ -351,6 +349,7 @@ private boolean initializeFromFilesystem(String initializationTime, List pendingDataInstants = getPendingDataInstants(dataMetaClient); // FILES partition is always required and is initialized first boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES); if (!filesPartitionAvailable) { @@ -374,11 +373,11 @@ private boolean initializeFromFilesystem(String initializationTime, List partitionInfoList; if (filesPartitionAvailable) { - partitionInfoList = listAllPartitionsFromMDT(initializationTime); + partitionInfoList = listAllPartitionsFromMDT(initializationTime, pendingDataInstants); } else { // if auto initialization is enabled, then we need to list all partitions from the file system if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) { - partitionInfoList = listAllPartitionsFromFilesystem(initializationTime); + partitionInfoList = listAllPartitionsFromFilesystem(initializationTime, pendingDataInstants); } else { // if auto initialization is disabled, we can return an empty list partitionInfoList = Collections.emptyList(); @@ -424,8 +423,7 @@ private boolean initializeFromFilesystem(String initializationTime, List> initializeColumnStatsPartition(Map> partitionToFilesMap) { + // during initialization, we need stats for base and log files. HoodieData records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); @@ -553,6 +552,16 @@ private Pair> initializeFilesPartition(List getPendingDataInstants(HoodieTableMetaClient dataMetaClient) { + // Initialize excluding the pending operations on the dataset + return dataMetaClient.getActiveTimeline() + .getInstantsAsStream().filter(i -> !i.isCompleted()) + // regular writers should not be blocked due to pending indexing action + .filter(i -> !HoodieTimeline.INDEXING_ACTION.equals(i.getAction())) + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toSet()); + } + private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option inflightInstantTimestamp) { // We can only initialize if there are no pending operations on the dataset List pendingDataInstant = dataMetaClient.getActiveTimeline() @@ -590,7 +599,7 @@ private HoodieTableMetaClient initializeMetaClient() throws IOException { * @param initializationTime Files which have a timestamp after this are neglected * @return List consisting of {@code DirectoryInfo} for each partition found. */ - private List listAllPartitionsFromFilesystem(String initializationTime) { + private List listAllPartitionsFromFilesystem(String initializationTime, Set pendingDataInstants) { List pathsToList = new LinkedList<>(); pathsToList.add(new SerializablePath(new CachingPath(dataWriteConfig.getBasePath()))); @@ -609,7 +618,7 @@ private List listAllPartitionsFromFilesystem(String initializatio List processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> { FileSystem fs = path.get().getFileSystem(conf.get()); String relativeDirPath = FSUtils.getRelativePartitionPath(serializableBasePath.get(), path.get()); - return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), initializationTime); + return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), initializationTime, pendingDataInstants); }, numDirsToList); pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size())); @@ -646,13 +655,14 @@ private List listAllPartitionsFromFilesystem(String initializatio * @param initializationTime Files which have a timestamp after this are neglected * @return List consisting of {@code DirectoryInfo} for each partition found. */ - private List listAllPartitionsFromMDT(String initializationTime) throws IOException { + private List listAllPartitionsFromMDT(String initializationTime, Set pendingDataInstants) throws IOException { List dirinfoList = new LinkedList<>(); - List allPartitionPaths = metadata.getAllPartitionPaths().stream() + List allAbsolutePartitionPaths = metadata.getAllPartitionPaths().stream() .map(partitionPath -> dataWriteConfig.getBasePath() + "/" + partitionPath).collect(Collectors.toList()); - Map partitionFileMap = metadata.getAllFilesInPartitions(allPartitionPaths); + Map partitionFileMap = metadata.getAllFilesInPartitions(allAbsolutePartitionPaths); for (Map.Entry entry : partitionFileMap.entrySet()) { - dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), initializationTime)); + String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(dataWriteConfig.getBasePath()), new Path(entry.getKey())); + dirinfoList.add(new DirectoryInfo(relativeDirPath, entry.getValue(), initializationTime, pendingDataInstants, false)); } return dirinfoList; } @@ -783,7 +793,8 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() { dataWriteConfig.isMetadataColumnStatsIndexEnabled(), dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), - dataWriteConfig.getColumnsEnabledForBloomFilterIndex()); + dataWriteConfig.getColumnsEnabledForBloomFilterIndex(), + dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize()); } /** @@ -936,7 +947,7 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { // Restore requires the existing pipelines to be shutdown. So we can safely scan the dataset to find the current // list of files in the filesystem. - List dirInfoList = listAllPartitionsFromFilesystem(instantTime); + List dirInfoList = listAllPartitionsFromFilesystem(instantTime, Collections.emptySet()); Map dirInfoMap = dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath, Function.identity())); dirInfoList.clear(); @@ -1494,29 +1505,39 @@ static class DirectoryInfo implements Serializable { // Is this a hoodie partition private boolean isHoodiePartition = false; - public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String maxInstantTime) { + public DirectoryInfo(String relativePath, FileStatus[] fileStatuses, String maxInstantTime, Set pendingDataInstants) { + this(relativePath, fileStatuses, maxInstantTime, pendingDataInstants, true); + } + + /** + * When files are directly fetched from Metadata table we do not need to validate HoodiePartitions. + */ + public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String maxInstantTime, Set pendingDataInstants, + boolean validateHoodiePartitions) { this.relativePath = relativePath; // Pre-allocate with the maximum length possible filenameToSizeMap = new HashMap<>(fileStatus.length); + // Presence of partition meta file implies this is a HUDI partition + // if input files are directly fetched from MDT, it may not contain the HoodiePartitionMetadata file. So, we can ignore the validation for isHoodiePartition. + isHoodiePartition = !validateHoodiePartitions || Arrays.stream(fileStatus).anyMatch(status -> status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)); for (FileStatus status : fileStatus) { - if (status.isDirectory()) { + // Do not attempt to search for more subdirectories inside directories that are partitions + if (!isHoodiePartition && status.isDirectory()) { // Ignore .hoodie directory as there cannot be any partitions inside it if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { this.subDirectories.add(status.getPath()); } - } else if (status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { - // Presence of partition meta file implies this is a HUDI partition - this.isHoodiePartition = true; - } else if (FSUtils.isDataFile(status.getPath())) { + } else if (isHoodiePartition && FSUtils.isDataFile(status.getPath())) { // Regular HUDI data file (base file or log file) String dataFileCommitTime = FSUtils.getCommitTime(status.getPath().getName()); - // Limit the file listings to files which were created before the maxInstant time. - if (HoodieTimeline.compareTimestamps(dataFileCommitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)) { + // Limit the file listings to files which were created by successful commits before the maxInstant time. + if (!pendingDataInstants.contains(dataFileCommitTime) && HoodieTimeline.compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS, maxInstantTime)) { filenameToSizeMap.put(status.getPath().getName(), status.getLen()); } } + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java new file mode 100644 index 0000000000000..2e0baaac74940 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata; + +/** + * Util methods used in tests to fetch col stats records for a log file. + */ +public class LogFileColStatsTestUtil { + + public static Option getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient datasetMetaClient, String latestCommitTime, + List columnsToIndex, Option writerSchemaOpt, + int maxBufferSize) throws IOException { + if (writerSchemaOpt.isPresent()) { + List fieldsToIndex = writerSchemaOpt.get().getFields().stream() + .filter(field -> columnsToIndex.contains(field.name())) + .collect(Collectors.toList()); + List records = new ArrayList<>(); + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withFileSystem(datasetMetaClient.getFs()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(Collections.singletonList(filePath)) + .withBufferSize(maxBufferSize) + .withLatestInstantTime(latestCommitTime) + .withReaderSchema(writerSchemaOpt.get()) + .withLogRecordScannerCallback(records::add) + .build(); + scanner.scan(); + if (records.isEmpty()) { + return Option.empty(); + } + List indexedRecords = new ArrayList<>(); + for (HoodieRecord hoodieRecord : records) { + Option insertValue = ((HoodieRecordPayload) hoodieRecord.getData()).getInsertValue(writerSchemaOpt.get()); + if (insertValue.isPresent()) { + indexedRecords.add(insertValue.get()); + } + } + Map> columnRangeMetadataMap = + collectColumnRangeMetadata(indexedRecords, fieldsToIndex, filePath); + List> columnRangeMetadataList = new ArrayList<>(columnRangeMetadataMap.values()); + return Option.of(getColStatsEntry(filePath, columnRangeMetadataList)); + } else { + throw new HoodieException("Writer schema needs to be set"); + } + } + + private static Row getColStatsEntry(String logFilePath, List> columnRangeMetadataList) { + Collections.sort(columnRangeMetadataList, (o1, o2) -> o1.getColumnName().compareTo(o2.getColumnName())); + Object[] values = new Object[(columnRangeMetadataList.size() * 3) + 2]; + values[0] = logFilePath.substring(logFilePath.lastIndexOf("/") + 1); + values[1] = columnRangeMetadataList.get(0).getValueCount(); + int counter = 2; + for (HoodieColumnRangeMetadata columnRangeMetadata: columnRangeMetadataList) { + values[counter++] = columnRangeMetadata.getValueCount(); + values[counter++] = columnRangeMetadata.getMinValue(); + values[counter++] = columnRangeMetadata.getMaxValue(); + } + return new GenericRow(values); + } + + public static Option getSchemaForTable(HoodieTableMetaClient metaClient) throws Exception { + TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); + return Option.of(schemaResolver.getTableAvroSchema()); + } +} + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index ac7e775017ac6..0d302912e7ac7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -211,6 +211,14 @@ public static String getFileId(String fullFileName) { return fullFileName.split("_", 2)[0]; } + /** + * @param filePath + * @returns the filename from the given path. Path could be the absolute path or just partition path and file name. + */ + public static String getFileNameFromPath(String filePath) { + return filePath.substring(filePath.lastIndexOf("/") + 1); + } + /** * Gets all partition paths assuming date partitioning (year, month, day) three levels down. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index f62ec0febd578..032aac8574eb1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -79,7 +79,7 @@ public void processNextRecord(HoodieRecord hoodieRecord) throws Exception @Override protected void processNextDeletedRecord(DeleteRecord deleteRecord) { - throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config"); + // no op } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 8b637be447f0c..8de6e4e2f4a49 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -265,32 +265,34 @@ public HoodieMetadataPayload(Option recordOpt) { } private HoodieMetadataPayload(String key, int type, Map filesystemMetadata) { - this(key, type, filesystemMetadata, null, null, null); + this(key, type, filesystemMetadata, null, null, null, false); } private HoodieMetadataPayload(String key, HoodieMetadataBloomFilter metadataBloomFilter) { - this(key, METADATA_TYPE_BLOOM_FILTER, null, metadataBloomFilter, null, null); + this(key, METADATA_TYPE_BLOOM_FILTER, null, metadataBloomFilter, null, null, metadataBloomFilter.getIsDeleted()); } private HoodieMetadataPayload(String key, HoodieMetadataColumnStats columnStats) { - this(key, METADATA_TYPE_COLUMN_STATS, null, null, columnStats, null); + this(key, METADATA_TYPE_COLUMN_STATS, null, null, columnStats, null, columnStats.getIsDeleted()); } private HoodieMetadataPayload(String key, HoodieRecordIndexInfo recordIndexMetadata) { - this(key, METADATA_TYPE_RECORD_INDEX, null, null, null, recordIndexMetadata); + this(key, METADATA_TYPE_RECORD_INDEX, null, null, null, recordIndexMetadata, false); } protected HoodieMetadataPayload(String key, int type, - Map filesystemMetadata, - HoodieMetadataBloomFilter metadataBloomFilter, - HoodieMetadataColumnStats columnStats, - HoodieRecordIndexInfo recordIndexMetadata) { + Map filesystemMetadata, + HoodieMetadataBloomFilter metadataBloomFilter, + HoodieMetadataColumnStats columnStats, + HoodieRecordIndexInfo recordIndexMetadata, + boolean isDeletedRecord) { this.key = key; this.type = type; this.filesystemMetadata = filesystemMetadata; this.bloomFilterMetadata = metadataBloomFilter; this.columnStatMetadata = columnStats; this.recordIndexMetadata = recordIndexMetadata; + this.isDeletedRecord = isDeletedRecord; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 2be2cbb94060b..36d7aa8f2cadf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -50,6 +50,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -62,6 +63,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Tuple3; @@ -101,6 +103,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.function.BiFunction; @@ -120,6 +123,7 @@ import static org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION; import static org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE; import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH; +import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_MISSING_FILEINDEX_FALLBACK; @@ -646,12 +650,8 @@ public static HoodieData convertMetadataToColumnStatsRecords(Hoodi return engineContext.parallelize(deleteFileList, parallelism) .flatMap(deleteFileInfoPair -> { String partitionPath = deleteFileInfoPair.getLeft(); - String filePath = deleteFileInfoPair.getRight(); - - if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return getColumnStatsRecords(partitionPath, filePath, dataTableMetaClient, columnsToIndex, true).iterator(); - } - return Collections.emptyListIterator(); + String fileName = deleteFileInfoPair.getRight(); + return getColumnStatsRecords(partitionPath, fileName, dataTableMetaClient, columnsToIndex, true).iterator(); }); } @@ -855,16 +855,10 @@ public static HoodieData convertFilesToColumnStatsRecords(HoodieEn // Create records MDT int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> { - final String partitionName = partitionFileFlagTuple.f0; + final String partitionPath = partitionFileFlagTuple.f0; final String filename = partitionFileFlagTuple.f1; final boolean isDeleted = partitionFileFlagTuple.f2; - if (!FSUtils.isBaseFile(new Path(filename)) || !filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - LOG.warn(String.format("Ignoring file %s as it is not a PARQUET file", filename)); - return Stream.empty().iterator(); - } - - final String filePathWithPartition = partitionName + "/" + filename; - return getColumnStatsRecords(partitionName, filePathWithPartition, dataTableMetaClient, columnsToIndex, isDeleted).iterator(); + return getColumnStatsRecords(partitionPath, filename, dataTableMetaClient, columnsToIndex, isDeleted, recordsGenerationParams.getMaxReaderBufferSize()).iterator(); }); } @@ -1111,19 +1105,26 @@ private static Stream translateWriteStatToColumnStats(HoodieWriteS return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false); } - return getColumnStatsRecords(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false); + String filePath = writeStat.getPath(); + return getColumnStatsRecords(writeStat.getPartitionPath(), getFileNameFromPath(filePath), datasetMetaClient, columnsToIndex, false); } private static Stream getColumnStatsRecords(String partitionPath, - String filePath, + String fileName, HoodieTableMetaClient datasetMetaClient, List columnsToIndex, boolean isDeleted) { - String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath; - String fileName = FSUtils.getFileName(filePath, partitionPath); + return getColumnStatsRecords(partitionPath, fileName, datasetMetaClient, columnsToIndex, isDeleted, -1); + } + + private static Stream getColumnStatsRecords(String partitionPath, + String fileName, + HoodieTableMetaClient datasetMetaClient, + List columnsToIndex, + boolean isDeleted, + int maxBufferSize) { if (isDeleted) { - // TODO we should delete records instead of stubbing them List> columnRangeMetadataList = columnsToIndex.stream() .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry)) .collect(Collectors.toList()); @@ -1132,33 +1133,78 @@ private static Stream getColumnStatsRecords(String partitionPath, } List> columnRangeMetadata = - readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, columnsToIndex); + readColumnRangeMetadataFrom(partitionPath, fileName, datasetMetaClient, columnsToIndex, maxBufferSize); return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadata, false); } - private static List> readColumnRangeMetadataFrom(String filePath, + private static List> readColumnRangeMetadataFrom(String partitionPath, + String fileName, HoodieTableMetaClient datasetMetaClient, - List columnsToIndex) { + List columnsToIndex, + int maxBufferSize) { + String partitionPathFileName = (partitionPath.equals(EMPTY_PARTITION_NAME) || partitionPath.equals(NON_PARTITIONED_NAME)) ? fileName + : partitionPath + "/" + fileName; try { - if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePath); - List> columnRangeMetadataList = - new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex); - - return columnRangeMetadataList; + Path fullFilePath = new Path(datasetMetaClient.getBasePathV2(), partitionPathFileName); + if (partitionPathFileName.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex); + } else if (FSUtils.isLogFile(fileName)) { + Option writerSchemaOpt = tryResolveSchemaForTable(datasetMetaClient); + LOG.warn("Reading log file: {}, to build column range metadata.", partitionPathFileName); + return getLogFileColumnRangeMetadata(fullFilePath.toString(), datasetMetaClient, columnsToIndex, writerSchemaOpt, maxBufferSize); } - LOG.warn("Column range index not supported for: " + filePath); + LOG.warn("Column range index not supported for: {}", partitionPathFileName); return Collections.emptyList(); } catch (Exception e) { // NOTE: In case reading column range metadata from individual file failed, // we simply fall back, in lieu of failing the whole task - LOG.error("Failed to fetch column range metadata for: " + filePath); + LOG.error("Failed to fetch column range metadata for: {}", partitionPathFileName); return Collections.emptyList(); } } + /** + * Read column range metadata from log file. + */ + @VisibleForTesting + protected static List> getLogFileColumnRangeMetadata(String filePath, + HoodieTableMetaClient datasetMetaClient, + List columnsToIndex, + Option writerSchemaOpt, + int maxBufferSize) throws IOException { + if (writerSchemaOpt.isPresent()) { + List fieldsToIndex = writerSchemaOpt.get().getFields().stream() + .filter(field -> columnsToIndex.contains(field.name())) + .collect(Collectors.toList()); + // read log file records without merging + List records = new ArrayList<>(); + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withFileSystem(datasetMetaClient.getFs()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(Collections.singletonList(filePath)) + .withBufferSize(maxBufferSize) + .withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getTimestamp()) + .withReaderSchema(writerSchemaOpt.get()) + .withLogRecordScannerCallback(records::add) + .build(); + scanner.scan(); + if (records.isEmpty()) { + return Collections.emptyList(); + } + + List indexedRecords = new ArrayList<>(); + for (HoodieRecord hoodieRecord : records) { + indexedRecords.add(hoodieRecord.toIndexedRecord(writerSchemaOpt.get(), new Properties()).get().getData()); + } + Map> columnRangeMetadataMap = + collectColumnRangeMetadata(indexedRecords, fieldsToIndex, getFileNameFromPath(filePath)); + return new ArrayList<>(columnRangeMetadataMap.values()); + } + return Collections.emptyList(); + } + /** * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by * the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java index 72a8bf4cd26f8..00ffb1baa397c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java @@ -42,9 +42,11 @@ public class MetadataRecordsGenerationParams implements Serializable { private final int columnStatsIndexParallelism; private final List targetColumnsForColumnStatsIndex; private final List targetColumnsForBloomFilterIndex; + private final int maxReaderBufferSize; - MetadataRecordsGenerationParams(HoodieTableMetaClient dataMetaClient, List enabledPartitionTypes, String bloomFilterType, int bloomIndexParallelism, - boolean isColumnStatsIndexEnabled, int columnStatsIndexParallelism, List targetColumnsForColumnStatsIndex, List targetColumnsForBloomFilterIndex) { + MetadataRecordsGenerationParams(HoodieTableMetaClient dataMetaClient, List enabledPartitionTypes, String bloomFilterType, + int bloomIndexParallelism, boolean isColumnStatsIndexEnabled, int columnStatsIndexParallelism, List targetColumnsForColumnStatsIndex, + List targetColumnsForBloomFilterIndex, int maxReaderBufferSize) { this.dataMetaClient = dataMetaClient; this.enabledPartitionTypes = enabledPartitionTypes; this.bloomFilterType = bloomFilterType; @@ -53,6 +55,7 @@ public class MetadataRecordsGenerationParams implements Serializable { this.columnStatsIndexParallelism = columnStatsIndexParallelism; this.targetColumnsForColumnStatsIndex = targetColumnsForColumnStatsIndex; this.targetColumnsForBloomFilterIndex = targetColumnsForBloomFilterIndex; + this.maxReaderBufferSize = maxReaderBufferSize; } public HoodieTableMetaClient getDataMetaClient() { @@ -86,4 +89,8 @@ public List getTargetColumnsForColumnStatsIndex() { public List getSecondaryKeysForBloomFilterIndex() { return targetColumnsForBloomFilterIndex; } + + public int getMaxReaderBufferSize() { + return maxReaderBufferSize; + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index 587fead6e1a04..4a602103c1a47 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -255,6 +255,10 @@ public void tesLogFileName() { assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(rlPath)); assertEquals(0, FSUtils.getStageIdFromLogPath(rlPath)); assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(rlPath)); + + assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/path/" + logFile)); + assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/abc/def/path/" + logFile)); + assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/" + logFile)); } @Test diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java index cde9341f5cdf1..4f022d7e0dafb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java @@ -39,11 +39,10 @@ * Tests {@link HoodieMetadataPayload}. */ public class TestHoodieMetadataPayload extends HoodieCommonTestHarness { + public static final String PARTITION_NAME = "2022/10/01"; @Test public void testFileSystemMetadataPayloadMerging() { - String partitionName = "2022/10/01"; - Map firstCommitAddedFiles = createImmutableMap( Pair.of("file1.parquet", 1000L), Pair.of("file2.parquet", 2000L), @@ -51,7 +50,7 @@ public void testFileSystemMetadataPayloadMerging() { ); HoodieRecord firstPartitionFilesRecord = - HoodieMetadataPayload.createPartitionFilesRecord(partitionName, firstCommitAddedFiles, Collections.emptyList()); + HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, firstCommitAddedFiles, Collections.emptyList()); Map secondCommitAddedFiles = createImmutableMap( // NOTE: This is an append @@ -63,13 +62,13 @@ public void testFileSystemMetadataPayloadMerging() { List secondCommitDeletedFiles = Collections.singletonList("file1.parquet"); HoodieRecord secondPartitionFilesRecord = - HoodieMetadataPayload.createPartitionFilesRecord(partitionName, secondCommitAddedFiles, secondCommitDeletedFiles); + HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, secondCommitAddedFiles, secondCommitDeletedFiles); HoodieMetadataPayload combinedPartitionFilesRecordPayload = secondPartitionFilesRecord.getData().preCombine(firstPartitionFilesRecord.getData()); HoodieMetadataPayload expectedCombinedPartitionedFilesRecordPayload = - HoodieMetadataPayload.createPartitionFilesRecord(partitionName, + HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, createImmutableMap( Pair.of("file2.parquet", 2000L), Pair.of("file3.parquet", 3333L), @@ -84,7 +83,6 @@ public void testFileSystemMetadataPayloadMerging() { @Test public void testColumnStatsPayloadMerging() throws IOException { - String partitionPath = "2022/10/01"; String fileName = "file.parquet"; String targetColName = "c1"; @@ -92,7 +90,7 @@ public void testColumnStatsPayloadMerging() throws IOException { HoodieColumnRangeMetadata.create(fileName, targetColName, 100, 1000, 5, 1000, 123456, 123456); HoodieRecord columnStatsRecord = - HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(c1Metadata), false) + HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME, Collections.singletonList(c1Metadata), false) .findFirst().get(); //////////////////////////////////////////////////////////////////////// @@ -105,7 +103,7 @@ public void testColumnStatsPayloadMerging() throws IOException { HoodieColumnRangeMetadata.create(fileName, targetColName, 0, 500, 0, 100, 12345, 12345); HoodieRecord updatedColumnStatsRecord = - HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(c1AppendedBlockMetadata), false) + HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME, Collections.singletonList(c1AppendedBlockMetadata), false) .findFirst().get(); HoodieMetadataPayload combinedMetadataPayload = @@ -115,7 +113,7 @@ public void testColumnStatsPayloadMerging() throws IOException { HoodieColumnRangeMetadata.create(fileName, targetColName, 0, 1000, 5, 1100, 135801, 135801); HoodieRecord expectedColumnStatsRecord = - HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(expectedColumnRangeMetadata), false) + HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME, Collections.singletonList(expectedColumnRangeMetadata), false) .findFirst().get(); // Assert combined payload @@ -135,7 +133,7 @@ public void testColumnStatsPayloadMerging() throws IOException { HoodieColumnRangeMetadata.stub(fileName, targetColName); HoodieRecord deletedColumnStatsRecord = - HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(c1StubbedMetadata), true) + HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME, Collections.singletonList(c1StubbedMetadata), true) .findFirst().get(); // NOTE: In this case, deleted (or tombstone) record will be therefore deleting diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json new file mode 100644 index 0000000000000..83790766db25b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json @@ -0,0 +1,2 @@ +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 984sdh","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json new file mode 100644 index 0000000000000..75aa7ada3ad3e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json @@ -0,0 +1,4 @@ +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":39} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":39} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json new file mode 100644 index 0000000000000..9c52707a27d05 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json @@ -0,0 +1,5 @@ +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":39} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":39} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":39} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json new file mode 100644 index 0000000000000..a08dea39c0501 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json @@ -0,0 +1,2 @@ +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 0000000000000..17e8f877c50bb --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1 @@ +{"c1":633,"c2":" 987sdk","c3":375.308,"c4":"2021-11-18T23:34:44.180-08:00","c5":0,"c6":"2020-01-01","c7":"NA==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json new file mode 100644 index 0000000000000..dcbf49b141f91 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json @@ -0,0 +1,2 @@ +{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 984sdh","c2_minValue":" 980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":5} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json new file mode 100644 index 0000000000000..146097347e036 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json @@ -0,0 +1,3 @@ +{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 984sdh","c2_minValue":" 980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":5} +{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":10} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json new file mode 100644 index 0000000000000..6256be16c1ddf --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json @@ -0,0 +1,5 @@ +{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 984sdh","c2_minValue":" 980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":5} +{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 984sdh","c2_minValue":" 980sdd","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qQ==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":5} +{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":10} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":39} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json new file mode 100644 index 0000000000000..8c7b1125314a4 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json @@ -0,0 +1,2 @@ +{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 984sdh","c2_minValue":" 980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":5} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json new file mode 100644 index 0000000000000..fc6c936c7871e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json @@ -0,0 +1,3 @@ +{"c1_nullCount":0,"c2_nullCount":0,"c3_nullCount":0,"c4_nullCount":0,"c5_nullCount":0,"c6_nullCount":0,"c7_nullCount":0,"c8_nullCount":0,"valueCount":0} +{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 989sda","c2_minValue":" 980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":10} +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 0000000000000..35ae749ddc3fc --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":323,"c2":" 980sdd","c3":null,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Ag==","c8":9} +{"c1":326,"c2":" 981sde","c3":64.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AA==","c8":9} +{"c1":555,"c2":" 982sdf","c3":153.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rw==","c8":9} +{"c1":556,"c2":" 983sdg","c3":246.427,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qw==","c8":9} +{"c1":562,"c2":" 984sdh","c3":977.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SA==","c8":9} +{"c1":619,"c2":" 985sdi","c3":230.320,"c4":"2021-11-19T23:34:44.180-08:00","c5":1000,"c6":"2020-02-13","c7":"QA==","c8":9} +{"c1":624,"c2":" 986sdj","c3":580.317,"c4":"2021-11-18T23:34:44.180-08:00","c5":-1,"c6":"2020-10-10","c7":"PQ==","c8":9} +{"c1":633,"c2":" 987sdk","c3":375.308,"c4":"2021-11-18T23:34:44.180-08:00","c5":-1000,"c6":"2020-01-01","c7":"NA==","c8":9} +{"c1":638,"c2":" 988sdl","c3":904.304,"c4":"2021-11-18T23:34:44.179-08:00","c5":20,"c6":"2020-08-25","c7":"MA==","c8":9} +{"c1":639,"c2":" 989sda","c3":0.300,"c4":"2021-11-18T23:34:44.179-08:00","c5":90,"c6":"2020-04-21","c7":"aa==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 0000000000000..5e04406cf2182 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,5 @@ +{"c1":323,"c2":" 980sdd","c3":10.00,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Ag==","c8":9} +{"c1":326,"c2":" 981sde","c3":10000.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AA==","c8":9} +{"c1":555,"c2":" 982sdf","c3":2.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rw==","c8":9} +{"c1":556,"c2":" 983sdg","c3":0.001,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qw==","c8":9} +{"c1":562,"c2":" 984sdh","c3":5.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SA==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 0000000000000..a83a82d8b8bff --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,5 @@ +{"c1":323,"c2":" 980sdd","c3":200000.00,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Aj==","c8":9} +{"c1":326,"c2":" 981sde","c3":100.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AB==","c8":9} +{"c1":555,"c2":" 982sdf","c3":20.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rx==","c8":9} +{"c1":556,"c2":" 983sdg","c3":0.1,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qf==","c8":9} +{"c1":562,"c2":" 984sdh","c3":4.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SL==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala index 70623814a1fbd..29f7a1c1bedbb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala @@ -18,14 +18,18 @@ package org.apache.hudi.functional -import org.apache.hadoop.fs.{LocatedFileStatus, Path} +import org.apache.avro.Schema +import org.apache.hadoop.fs.Path import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema import org.apache.hudi.HoodieConversionUtils.toProperties +import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} -import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.model.{HoodieBaseFile, HoodieFileGroup, HoodieLogFile, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase -import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.common.table.view.FileSystemViewManager +import org.apache.hudi.config.HoodieCompactionConfig +import org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, ColumnStatsTestParams} +import org.apache.hudi.testutils.{HoodieSparkClientTestBase, LogFileColStatsTestUtil} import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} import org.apache.hudi.AvroConversionUtils import org.apache.spark.sql._ @@ -37,6 +41,9 @@ import org.junit.jupiter.params.provider.Arguments import java.math.BigInteger import java.sql.{Date, Timestamp} +import java.util +import java.util.List +import java.util.stream.Collectors import scala.collection.JavaConverters._ import scala.util.Random @@ -76,42 +83,39 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { cleanupSparkContexts() } - protected def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase, - metadataOpts: Map[String, String], - hudiOpts: Map[String, String], - dataSourcePath: String, - expectedColStatsSourcePath: String, - operation: String, - saveMode: SaveMode, - shouldValidate: Boolean = true): Unit = { - val sourceJSONTablePath = getClass.getClassLoader.getResource(dataSourcePath).toString + protected def doWriteAndValidateColumnStats(params: ColumnStatsTestParams): Unit = { + + val sourceJSONTablePath = getClass.getClassLoader.getResource(params.dataSourcePath).toString // NOTE: Schema here is provided for validation that the input date is in the appropriate format val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath) + val writeOptions: Map[String, String] = params.hudiOpts ++ params.metadataOpts + inputDF .sort("c1") - .repartition(4, new Column("c1")) + .repartition(params.numPartitions, new Column("c1")) .write .format("hudi") - .options(hudiOpts) - .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024) - .option(DataSourceWriteOptions.OPERATION.key, operation) - .mode(saveMode) + .options(writeOptions) + .option(DataSourceWriteOptions.OPERATION.key, params.operation) + .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(params.parquetMaxFileSize)) + .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), String.valueOf(params.smallFileLimit)) + .mode(params.saveMode) .save(basePath) dfList = dfList :+ inputDF metaClient = HoodieTableMetaClient.reload(metaClient) - if (shouldValidate) { + if (params.shouldValidate) { // Currently, routine manually validating the column stats (by actually reading every column of every file) // only supports parquet files. Therefore we skip such validation when delta-log files are present, and only // validate in following cases: (1) COW: all operations; (2) MOR: insert only. - val shouldValidateColumnStatsManually = testCase.tableType == HoodieTableType.COPY_ON_WRITE || - operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + val shouldValidateColumnStatsManually = params.testCase.tableType == HoodieTableType.COPY_ON_WRITE || + params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) validateColumnStatsIndex( - testCase, metadataOpts, expectedColStatsSourcePath, shouldValidateColumnStatsManually) + params.testCase, params.metadataOpts, params.expectedColStatsSourcePath, shouldValidateColumnStatsManually, params.latestCompletedCommit) } } @@ -119,20 +123,19 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { includedCols: Seq[String], indexedCols: Seq[String], indexSchema: StructType): DataFrame = { - val files = { - val it = fs.listFiles(new Path(tablePath), true) - var seq = Seq[LocatedFileStatus]() - while (it.hasNext) { - seq = seq :+ it.next() - } - seq.filter(fs => fs.getPath.getName.endsWith(".parquet")) - } - - spark.createDataFrame( - files.flatMap(file => { - val df = spark.read.schema(sourceTableSchema).parquet(file.getPath.toString) + val metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build() + val fsv = FileSystemViewManager.createInMemoryFileSystemView(new HoodieSparkEngineContext(jsc), metaClient, HoodieMetadataConfig.newBuilder().enable(false).build()) + fsv.loadAllPartitions() + val filegroupList = fsv.getAllFileGroups.collect(Collectors.toList[HoodieFileGroup]) + val baseFiles = filegroupList.asScala + .flatMap(fileGroup => fileGroup.getAllBaseFiles.iterator().asScala) + .map(baseFile => new Path(baseFile.getPath)) + + val baseFilesDf = spark.createDataFrame( + baseFiles.flatMap(file => { + val df = spark.read.schema(sourceTableSchema).parquet(file.toString) val exprs: Seq[String] = - s"'${typedLit(file.getPath.getName)}' AS file" +: + s"'${typedLit(file.getName)}' AS file" +: s"sum(1) AS valueCount" +: df.columns .filter(col => includedCols.contains(col)) @@ -160,12 +163,62 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { }).asJava, indexSchema ) + + if (metaClient.getTableConfig.getTableType == HoodieTableType.COPY_ON_WRITE) { + baseFilesDf // COW table + } else { + val allLogFiles = filegroupList.asScala + .flatMap(fileGroup => fileGroup.getAllFileSlices.iterator().asScala) + .flatMap(fileSlice => fileSlice.getLogFiles.iterator().asScala) + .toList.asJava + if (allLogFiles.isEmpty) { + baseFilesDf // MOR table, but no log files. + } else { + val colsToGenerateStats = indexedCols // check for included cols + val writerSchemaOpt = LogFileColStatsTestUtil.getSchemaForTable(metaClient) + val latestCompletedCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + baseFilesDf.union(getColStatsFromLogFiles(allLogFiles, latestCompletedCommit, + colsToGenerateStats.asJava, + metaClient, + writerSchemaOpt: org.apache.hudi.common.util.Option[Schema], + HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue(), + indexSchema)) + } + } + } + + protected def getColStatsFromLogFiles(logFiles: List[HoodieLogFile], latestCommit: String, columnsToIndex: util.List[String], + datasetMetaClient: HoodieTableMetaClient, + writerSchemaOpt: org.apache.hudi.common.util.Option[Schema], + maxBufferSize: Integer, + indexSchema: StructType): DataFrame = { + val colStatsEntries = logFiles.asScala.map(logFile => { + try { + getColStatsFromLogFile(logFile.getPath.toString, latestCommit, columnsToIndex, datasetMetaClient, writerSchemaOpt, maxBufferSize) + } catch { + case e: Exception => + throw e + } + }).filter(rowOpt => rowOpt.isPresent).map(rowOpt => rowOpt.get()).toList.asJava + spark.createDataFrame(colStatsEntries, indexSchema) + } + + protected def getColStatsFromLogFile(logFilePath: String, + latestCommit: String, + columnsToIndex: util.List[String], + datasetMetaClient: HoodieTableMetaClient, + writerSchemaOpt: org.apache.hudi.common.util.Option[Schema], + maxBufferSize: Integer + ): org.apache.hudi.common.util.Option[Row] = { + LogFileColStatsTestUtil.getLogFileColumnRangeMetadata(logFilePath, datasetMetaClient, latestCommit, + columnsToIndex, writerSchemaOpt, maxBufferSize) } protected def validateColumnStatsIndex(testCase: ColumnStatsTestCase, - metadataOpts: Map[String, String], - expectedColStatsSourcePath: String, - validateColumnStatsManually: Boolean): Unit = { + metadataOpts: Map[String, String], + expectedColStatsSourcePath: String, + validateColumnStatsManually: Boolean, + latestCompletedCommit: String): Unit = { val metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(toProperties(metadataOpts)) .build() @@ -181,7 +234,8 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { } } val (expectedColStatsSchema, _) = composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, sourceTableSchema) - val validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue") + val validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue", "c3_maxValue", + "c3_minValue", "c5_maxValue", "c5_minValue") columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames, testCase.shouldReadInMemory) { transposedColStatsDF => // Match against expected column stats table @@ -273,14 +327,41 @@ object ColumnStatIndexTestBase { def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType => Seq(Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = true)), - Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = false))) + Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = false)) + ) ): _*) } def testMetadataColumnStatsIndexParamsForMOR: java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = true)), - Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = false))) - : _*) + Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = false)) + ) + : _*) } + + def testTableTypePartitionTypeParams: java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + Seq( + Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8"), + // empty partition col represents non-partitioned table. + Arguments.arguments(HoodieTableType.COPY_ON_WRITE, ""), + Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8"), + Arguments.arguments(HoodieTableType.MERGE_ON_READ, "") + ) + : _*) + } + + case class ColumnStatsTestParams(testCase: ColumnStatsTestCase, + metadataOpts: Map[String, String], + hudiOpts: Map[String, String], + dataSourcePath: String, + expectedColStatsSourcePath: String, + operation: String, + saveMode: SaveMode, + shouldValidate: Boolean = true, + latestCompletedCommit: String = null, + numPartitions: Integer = 4, + parquetMaxFileSize: Integer = 10 * 1024, + smallFileLimit: Integer = 100 * 1024 * 1024) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 7daa2dc69b9a6..c1754d8c6c1b5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -19,17 +19,24 @@ package org.apache.hudi.functional import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, DataSourceWriteOptions} import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD} import org.apache.hudi.HoodieConversionUtils.toProperties import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, HoodieStorageConfig} -import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.model.{FileSlice, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.ParquetUtils -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.view.FileSystemViewManager +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD +import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, Literal, Or} @@ -63,17 +70,17 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" ) ++ metadataOpts - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/input-table-json", expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, - saveMode = SaveMode.Overwrite) + saveMode = SaveMode.Overwrite)) - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/another-input-table-json", expectedColStatsSourcePath = "index/colstats/updated-column-stats-index-table.json", operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, - saveMode = SaveMode.Append) + saveMode = SaveMode.Append)) // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding // deferred updates), diverging from COW @@ -83,13 +90,441 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { "index/colstats/mor-updated2-column-stats-index-table.json" } - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/update-input-table-json", expectedColStatsSourcePath = expectedColStatsSourcePath, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, - saveMode = SaveMode.Append) + saveMode = SaveMode.Append)) } + @ParameterizedTest + @MethodSource(Array("testTableTypePartitionTypeParams")) + def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: HoodieTableType, partitionCol : String): Unit = { + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + PARTITIONPATH_FIELD.key() -> partitionCol, + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5" + ) ++ metadataOpts + + // inserts + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + // updates + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update2-input-table-json/", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + // delete a subset of recs. this will add a delete log block for MOR table. + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/delete-input-table-json/", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + val metadataOpts1 = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding + // deferred updates), diverging from COW + + val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-bootstrap1-column-stats-index-table.json" + } else { + "index/colstats/mor-bootstrap1-column-stats-index-table.json" + } + + metaClient = HoodieTableMetaClient.reload(metaClient) + val latestCompletedCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // lets validate that we have log files generated in case of MOR table + if (tableType == HoodieTableType.MERGE_ON_READ) { + val metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build() + val fsv = FileSystemViewManager.createInMemoryFileSystemView(new HoodieSparkEngineContext(jsc), metaClient, HoodieMetadataConfig.newBuilder().enable(false).build()) + fsv.loadAllPartitions() + val basePath2 = new Path(basePath) + val allPartitionPaths = fsv.getPartitionPaths + allPartitionPaths.asScala.foreach(partitionPath => { + val pPath = FSUtils.getRelativePartitionPath(basePath2, partitionPath) + assertTrue (fsv.getLatestFileSlices(pPath).iterator().asScala.count(fileSlice => fileSlice.getLogFiles.findAny().isPresent) > 0) + }) + fsv.close() + } + + // updates a subset which are not deleted and enable col stats and validate bootstrap + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts1, commonOpts, + dataSourcePath = "index/colstats/update3-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + true, + latestCompletedCommit, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + // trigger one more upsert and compaction (w/ MOR table) and validate. + val expectedColStatsSourcePath1 = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-bootstrap2-column-stats-index-table.json" + } else { + "index/colstats/mor-bootstrap2-column-stats-index-table.json" + } + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts1, commonOpts, + dataSourcePath = "index/colstats/update4-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath1, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + true, + latestCompletedCommit, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + } + + @ParameterizedTest + @MethodSource(Array("testTableTypePartitionTypeParams")) + def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType: HoodieTableType, partitionCol : String): Unit = { + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + PARTITIONPATH_FIELD.key() -> partitionCol, + "hoodie.write.markers.type" -> "DIRECT", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + + // inserts + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + // updates + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update2-input-table-json/", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + simulateFailureForLatestCommit(tableType, partitionCol) + + val metadataOpts1 = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding + // deferred updates), diverging from COW + + val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json" + } else { + "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json" + } + + metaClient = HoodieTableMetaClient.reload(metaClient) + val latestCompletedCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // updates a subset which are not deleted and enable col stats and validate bootstrap + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts1, commonOpts, + dataSourcePath = "index/colstats/update3-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + true, + latestCompletedCommit, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + metaClient = HoodieTableMetaClient.reload(metaClient) + assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0) + } + + def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol: String) : Unit = { + // simulate failure for latest commit. + metaClient = HoodieTableMetaClient.reload(metaClient) + var baseFileName : String = null + var logFileName : String = null + val lastCompletedCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get() + if (tableType == HoodieTableType.MERGE_ON_READ) { + val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) { + fs.listStatus(new Path(metaClient.getBasePath)).toSeq + } else { + fs.listStatus(new Path(metaClient.getBasePath + "/9")).toSeq + } + val logFileFileStatus = dataFiles.filter(fileStatus => fileStatus.getPath.getName.contains(".log")).head + logFileName = logFileFileStatus.getPath.getName + } else { + val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) { + fs.listStatus(new Path(metaClient.getBasePath)).toSeq + } else { + fs.listStatus(new Path(metaClient.getBasePath + "/9")).toSeq + } + val baseFileFileStatus = dataFiles.filter(fileStatus => fileStatus.getPath.getName.contains(lastCompletedCommit.getTimestamp)).head + baseFileName = baseFileFileStatus.getPath.getName + } + + val latestCompletedFileName = lastCompletedCommit.getFileName + fs.delete(new Path(metaClient.getBasePath + "/.hoodie/" + latestCompletedFileName), false) + + // re-create marker for the deleted file. + if (tableType == HoodieTableType.MERGE_ON_READ) { + if (StringUtils.isNullOrEmpty(partitionCol)) { + { fs.create(new Path(metaClient.getBasePath + "/.hoodie/.temp/" + lastCompletedCommit.getTimestamp + "/" + logFileName + ".marker.APPEND")).close() } + } else { + { fs.create(new Path(metaClient.getBasePath + "/.hoodie/.temp/" + lastCompletedCommit.getTimestamp + "/9/" + logFileName + ".marker.APPEND")).close() } + } + } else { + if (StringUtils.isNullOrEmpty(partitionCol)) { + { fs.create(new Path(metaClient.getBasePath + "/.hoodie/.temp/" + lastCompletedCommit.getTimestamp + "/" + baseFileName + ".marker.MERGE")).close() } + } else { + { fs.create(new Path(metaClient.getBasePath + "/.hoodie/.temp/" + lastCompletedCommit.getTimestamp + "/9/" + baseFileName + ".marker.MERGE")).close() } + } + } + } + + @Test + def testMORDeleteBlocks(): Unit = { + val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ + val partitionCol = "c8" + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + PARTITIONPATH_FIELD.key() -> partitionCol, + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5" + ) ++ metadataOpts + + // inserts + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + // updates + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update2-input-table-json/", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + val expectedColStatsSourcePath = "index/colstats/mor-delete-block1-column-stats-index-table.json" + + // delete a subset of recs. this will add a delete log block for MOR table. + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/delete-input-table-json/", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + true, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + } + + @ParameterizedTest + @ValueSource(strings = Array("", "c8")) + def testColStatsWithCleanCOW(partitionCol: String): Unit = { + val tableType: HoodieTableType = HoodieTableType.COPY_ON_WRITE + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + PARTITIONPATH_FIELD.key() -> partitionCol, + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1" + ) ++ metadataOpts + + // inserts + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + val metadataOpts1 = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + // updates 1 + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts1, commonOpts, + dataSourcePath = "index/colstats/update2-input-table-json/", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-clean1-column-stats-index-table.json" + } else { + "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json" + } + + // updates 2 + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts1, commonOpts, + dataSourcePath = "index/colstats/update3-input-table-json/", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + true, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + } + + @ParameterizedTest + @ValueSource(strings = Array("", "c8")) + def testColStatsWithCleanMOR(partitionCol: String): Unit = { + val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + PARTITIONPATH_FIELD.key() -> partitionCol, + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2" + ) ++ metadataOpts + + // inserts + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + val metadataOpts1 = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + + // updates 1 + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts1, commonOpts, + dataSourcePath = "index/colstats/update2-input-table-json/", + expectedColStatsSourcePath = null, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + false, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-clean1-column-stats-index-table.json" + } else { + "index/colstats/mor-clean1-column-stats-index-table.json" + } + + // updates 2 + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts1, commonOpts, + dataSourcePath = "index/colstats/update3-input-table-json/", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + true, + numPartitions = 1, + parquetMaxFileSize = 100 * 1024 * 1024, + smallFileLimit = 0)) + + metaClient = HoodieTableMetaClient.reload(metaClient) + assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0) + } @ParameterizedTest @EnumSource(classOf[HoodieTableType]) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala index 82f0342f729ec..52bc915a08f81 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala @@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType, Writ import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} -import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase +import org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, ColumnStatsTestParams} import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY import org.apache.hudi.metadata.HoodieMetadataFileSystemView import org.apache.hudi.util.JavaConversions @@ -90,12 +90,12 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name() ) ++ metadataOpts - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/input-table-json", expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite, - shouldValidate = false) + shouldValidate = false)) assertEquals(4, getLatestDataFilesCount(commonOpts)) assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = false)) @@ -135,12 +135,12 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { verifyFileIndexAndSQLQueries(commonOpts, isTableDataSameAsAfterSecondInstant = true) // Add the last df back and verify the queries - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/update-input-table-json", expectedColStatsSourcePath = "", operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false) + shouldValidate = false)) verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false) } @@ -197,12 +197,12 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty()) writeClient.close() - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/update-input-table-json", expectedColStatsSourcePath = "", operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false) + shouldValidate = false)) verifyFileIndexAndSQLQueries(commonOpts) } @@ -215,17 +215,17 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { } else { "" } - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/input-table-json", expectedColStatsSourcePath = s"index/colstats/column-stats-index-table${filePostfix}.json", operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, - saveMode = SaveMode.Overwrite) + saveMode = SaveMode.Overwrite)) - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/another-input-table-json", expectedColStatsSourcePath = s"index/colstats/updated-column-stats-index-table${filePostfix}.json", operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, - saveMode = SaveMode.Append) + saveMode = SaveMode.Append)) // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding // deferred updates), diverging from COW @@ -235,12 +235,12 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { "index/colstats/mor-updated2-column-stats-index-table.json" } - doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts, + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, dataSourcePath = "index/colstats/update-input-table-json", expectedColStatsSourcePath = expectedColStatsSourcePath, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate) + shouldValidate)) } def verifyFileIndexAndSQLQueries(opts: Map[String, String], isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean = true): Unit = {