diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index fce57a9d066cd..c1028ff7b9159 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -584,7 +584,7 @@ public void testMetadataTableCompactionWithPendingInstants() throws Exception { doWriteOperation(testTable, metaClient.createNewInstantTime()); HoodieTableMetadata tableMetadata = metadata(writeConfig, context); // verify that compaction of metadata table does not kick in. - assertFalse(tableMetadata.getLatestCompactionTime().isPresent()); + assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); // write some commits to trigger the MDT compaction doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); @@ -1723,7 +1723,7 @@ public void testMetadataMultiWriter() throws Exception { // Ensure all commits were synced to the Metadata Table HoodieTableMetaClient metadataMetaClient = createMetaClientForMetadataTable(); - assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 5); + assertEquals(6, metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants()); assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000002"))); assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000003"))); assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004"))); @@ -1775,7 +1775,7 @@ public void testMultiWriterForDoubleLocking() throws Exception { LOG.warn("total commits in metadata table " + metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants()); // 6 commits and 2 cleaner commits. - assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); + assertEquals(9, metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants()); assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 50b5f9a26bfab..09dff2db1caf0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -881,7 +881,7 @@ public void testVirtualKeysInBaseFiles() throws Exception { */ @Test public void testMetadataTableCompactionWithPendingInstants() throws Exception { - init(COPY_ON_WRITE, false); + init(COPY_ON_WRITE, true); writeConfig = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(true) @@ -902,7 +902,7 @@ public void testMetadataTableCompactionWithPendingInstants() throws Exception { doWriteOperation(testTable, metaClient.createNewInstantTime()); HoodieTableMetadata tableMetadata = metadata(writeConfig, context); // verify that compaction of metadata table does not kick in. - assertFalse(tableMetadata.getLatestCompactionTime().isPresent()); + assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); // write some commits to trigger the MDT compaction doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); @@ -1112,7 +1112,7 @@ public void testMetadataRollbackWithCompaction() throws Exception { @Test public void testMetadataRollbackDuringInit() throws Exception { HoodieTableType tableType = COPY_ON_WRITE; - init(tableType, false); + init(tableType, true); writeConfig = getWriteConfigBuilder(false, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(true) @@ -1163,9 +1163,9 @@ private void revertTableToInflightState(HoodieWriteConfig writeConfig) throws IO HoodieActiveTimeline mdtTimeline = mdtMetaClient.getActiveTimeline(); assertEquals(1, timeline.countInstants()); assertEquals(1, timeline.getCommitsTimeline().filterCompletedInstants().countInstants()); - assertEquals(3, mdtTimeline.countInstants()); - assertEquals(3, mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants()); - String mdtInitCommit2 = mdtTimeline.getCommitsTimeline().filterCompletedInstants().getInstants().get(1).getTimestamp(); + assertEquals(4, mdtTimeline.countInstants()); + assertEquals(4, mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants()); + String mdtInitCommit2 = mdtTimeline.getCommitsTimeline().filterCompletedInstants().getInstants().get(2).getTimestamp(); Pair lastCommitMetadataWithValidData = mdtTimeline.getLastCommitMetadataWithValidData().get(); String commit = lastCommitMetadataWithValidData.getLeft().getTimestamp(); @@ -2215,7 +2215,7 @@ public void testMetadataMultiWriter() throws Exception { // Ensure all commits were synced to the Metadata Table HoodieTableMetaClient metadataMetaClient = createMetaClient(metadataTableBasePath); - assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 5); + assertEquals(6, metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants()); assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000002"))); assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000003"))); assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004"))); @@ -2267,8 +2267,8 @@ public void testMultiWriterForDoubleLocking() throws Exception { HoodieTableMetaClient metadataMetaClient = createMetaClient(metadataTableBasePath); LOG.warn("total commits in metadata table " + metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants()); - // 6 commits and 2 cleaner commits. - assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); + // 7 commits and 2 cleaner commits. + assertEquals(9, metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants()); assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 435b5543f1c92..778932cbef926 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -243,6 +243,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata) + .withMetadataIndexPartitionStats(false) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable).build()) .withWriteConcurrencyMode(writeConcurrencyMode) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) @@ -1645,6 +1646,7 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception { .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) + .withMetadataIndexPartitionStats(false) .withMaxNumDeltaCommitsBeforeCompaction(8) .build()) .forTable("test-trip-table").build(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 736b21e847aa8..7372da96eeb39 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -339,7 +339,7 @@ public final class HoodieMetadataConfig extends HoodieConfig { public static final ConfigProperty ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty .key(METADATA_PREFIX + ".index.partition.stats.enable") - .defaultValue(false) + .defaultValue(true) .sinceVersion("1.0.0") .withDocumentation("Enable aggregating stats for each column at the storage partition level."); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index c87686772eb28..dd911d0624abc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -196,8 +196,8 @@ private List getPartitionPathWithPathPrefixUsingFilterExpression(String } else if (!path.getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { return Pair.of(Option.empty(), Option.of(path)); } - } else if (path.getName() - .startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { + } else if (path.getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX) + || MetadataPartitionType.isValidPartitionFilePath(path.getName())) { String partitionName = FSUtils.getRelativePartitionPath(dataBasePath, path.getParent()); @@ -212,7 +212,7 @@ private List getPartitionPathWithPathPrefixUsingFilterExpression(String || (Boolean) fullBoundExpr.eval( extractPartitionValues(partitionFields, relativePartitionPath, urlEncodePartitioningEnabled))) - .collect(Collectors.toList())); + .collect(Collectors.toSet())); Expression partialBoundExpr; // If partitionPaths is nonEmpty, we're already at the last path level, and all paths diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 562504e56bbe5..9ce4974c5c9fc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -2174,21 +2174,14 @@ public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataCo return newColumnStats; } - Comparable minValue = - (Comparable) Stream.of( - (Comparable) unwrapAvroValueWrapper(prevColumnStats.getMinValue()), - (Comparable) unwrapAvroValueWrapper(newColumnStats.getMinValue())) - .filter(Objects::nonNull) - .min(Comparator.naturalOrder()) - .orElse(null); - - Comparable maxValue = - (Comparable) Stream.of( - (Comparable) unwrapAvroValueWrapper(prevColumnStats.getMaxValue()), - (Comparable) unwrapAvroValueWrapper(newColumnStats.getMaxValue())) - .filter(Objects::nonNull) - .max(Comparator.naturalOrder()) - .orElse(null); + Comparable minValue = castAndCompare( + unwrapAvroValueWrapper(newColumnStats.getMinValue()), + unwrapAvroValueWrapper(prevColumnStats.getMinValue()), + true); + Comparable maxValue = castAndCompare( + unwrapAvroValueWrapper(prevColumnStats.getMaxValue()), + unwrapAvroValueWrapper(newColumnStats.getMaxValue()), + false); return HoodieMetadataColumnStats.newBuilder(HoodieMetadataPayload.METADATA_COLUMN_STATS_BUILDER_STUB.get()) .setFileName(newColumnStats.getFileName()) @@ -2203,6 +2196,30 @@ public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataCo .build(); } + public static Comparable castAndCompare(Comparable newVal, Comparable prevVal, boolean isMin) { + if (newVal == null) { + return prevVal; + } else if (prevVal == null) { + return newVal; + } + + Comparable picked; + // Same type. + if (newVal.getClass() == prevVal.getClass()) { + picked = newVal.compareTo(prevVal) < 0 ? newVal : prevVal; + } else if (newVal instanceof Number && prevVal instanceof Number) { + picked = Double.compare(((Number) newVal).doubleValue(), ((Number) prevVal).doubleValue()) < 0 ? newVal : prevVal; + } else { + picked = newVal.toString().compareTo(prevVal.toString()) < 0 ? newVal : prevVal; + } + + if (isMin) { + return picked; + } else { + return picked == newVal ? prevVal : newVal; + } + } + public static Map combineFileSystemMetadata(HoodieMetadataPayload older, HoodieMetadataPayload newer) { Map combinedFileInfo = new HashMap<>(); // First, add all files listed in the previous record diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java index c539971162d2b..c81febabf45f7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java @@ -297,8 +297,8 @@ private static void constructColumnStatsMetadataPayload(HoodieMetadataPayload pa String.format("Valid %s record expected for type: %s", SCHEMA_FIELD_ID_COLUMN_STATS, MetadataPartitionType.COLUMN_STATS.getRecordType())); } else { payload.columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get()) - .setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME)) - .setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME)) + .setFileName((String)columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME)) + .setColumnName((String)columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME)) // AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet. // This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484. // We should avoid using GenericRecord and convert GenericRecord into a serializable type. @@ -434,6 +434,16 @@ public static MetadataPartitionType fromPartitionPath(String partitionPath) { throw new IllegalArgumentException("No MetadataPartitionType for partition path: " + partitionPath); } + public static boolean isValidPartitionFilePath(String partitionFilePath) { + partitionFilePath = partitionFilePath.startsWith(".") ? partitionFilePath.substring(1) : partitionFilePath; + for (MetadataPartitionType partitionType : getValidValues()) { + if (partitionFilePath.startsWith(partitionType.fileIdPrefix)) { + return true; + } + } + return false; + } + @Override public String toString() { return "Metadata partition {" diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java index 2501807981c3c..787d6be07beda 100644 --- a/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java @@ -65,19 +65,19 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType case FUNCTIONAL_INDEX: case SECONDARY_INDEX: metadataConfigBuilder.enable(true); - expectedEnabledPartitions = 1; + expectedEnabledPartitions = 2; break; case COLUMN_STATS: metadataConfigBuilder.enable(true).withMetadataIndexColumnStats(true); - expectedEnabledPartitions = 2; + expectedEnabledPartitions = 3; break; case BLOOM_FILTERS: metadataConfigBuilder.enable(true).withMetadataIndexBloomFilter(true); - expectedEnabledPartitions = 2; + expectedEnabledPartitions = 3; break; case RECORD_INDEX: metadataConfigBuilder.enable(true).withEnableRecordIndex(true); - expectedEnabledPartitions = 2; + expectedEnabledPartitions = 3; break; case PARTITION_STATS: metadataConfigBuilder.enable(true).withMetadataIndexPartitionStats(true).withColumnStatsIndexForColumns("partitionCol"); @@ -91,7 +91,7 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType // Verify partition type is enabled due to config if (partitionType == MetadataPartitionType.FUNCTIONAL_INDEX || partitionType == MetadataPartitionType.SECONDARY_INDEX) { - assertEquals(1, enabledPartitions.size(), "FUNCTIONAL_INDEX or SECONDARY_INDEX should be enabled by SQL, only FILES is enabled in this case."); + assertEquals(2, enabledPartitions.size(), "FUNCTIONAL_INDEX or SECONDARY_INDEX should be enabled by SQL, only FILES is enabled in this case."); assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES)); } else { assertEquals(expectedEnabledPartitions, enabledPartitions.size()); @@ -114,7 +114,7 @@ public void testPartitionAvailableByMetaClientOnly() { List enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient); // Verify RECORD_INDEX and FILES is enabled due to availability - assertEquals(2, enabledPartitions.size(), "RECORD_INDEX and FILES should be available"); + assertEquals(3, enabledPartitions.size(), "RECORD_INDEX, FILES, and PARTITION_STATS should be available; "); assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability"); assertTrue(enabledPartitions.contains(MetadataPartitionType.RECORD_INDEX), "RECORD_INDEX should be enabled by availability"); } @@ -153,7 +153,7 @@ public void testFunctionalIndexPartitionEnabled() { List enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient); // Verify FUNCTIONAL_INDEX and FILES is enabled due to availability - assertEquals(2, enabledPartitions.size(), "FUNCTIONAL_INDEX and FILES should be available"); + assertEquals(3, enabledPartitions.size(), "FUNCTIONAL_INDEX, FILES and PARTITION_STATS should be available"); assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability"); assertTrue(enabledPartitions.contains(MetadataPartitionType.FUNCTIONAL_INDEX), "FUNCTIONAL_INDEX should be enabled by availability"); } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index aad5f3b09e46f..36219e909bd86 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -447,7 +447,7 @@ private static Comparable convertToNativeJavaType(PrimitiveType primitiveType synchronized (primitiveType.stringifier()) { // Date logical type is implemented as a signed INT32 // REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - return java.sql.Date.valueOf( + return java.time.LocalDate.parse( primitiveType.stringifier().stringify((Integer) val) ); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala index 736926cd6979f..88b58cc2d6d91 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala @@ -22,7 +22,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.HoodieColumnRangeMetadata +import org.apache.hudi.common.model.{ActionType, HoodieColumnRangeMetadata} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.table.view.{FileSystemViewManager, HoodieTableFileSystemView} @@ -37,11 +37,10 @@ import org.apache.hudi.storage.hadoop.HoodieHadoopStorage import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf import org.apache.hudi.util.JavaScalaConverters.convertJavaListToScalaSeq - import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.{col, explode} -import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource @@ -49,7 +48,6 @@ import org.junit.jupiter.params.provider.CsvSource import java.util import java.util.Collections import java.util.stream.Collectors - import scala.collection.JavaConverters._ @Tag("functional") @@ -239,10 +237,13 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn val metadataOpts: Map[String, String] = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + // Enable partition stats explicitly, though its default value is true. + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key() -> "true", DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ", - HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "5" + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "5", + HoodieWriteConfig.TABLE_SERVICES_ENABLED.key() -> "false" ) - val combinedOpts: Map[String, String] = partitionedCommonOpts ++ metadataOpts + var combinedOpts: Map[String, String] = partitionedCommonOpts ++ metadataOpts // Insert T0 val newRecords = dataGen.generateInserts("000", 100) @@ -259,7 +260,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn .setBasePath(s"$basePath/.hoodie/metadata") .build val timelineT0 = metaClient.getActiveTimeline - assertEquals(3, timelineT0.countInstants()) + assertEquals(4, timelineT0.countInstants()) assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT0.lastInstant().get().getAction) val t0 = timelineT0.lastInstant().get().getTimestamp @@ -285,7 +286,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn //Validate T1 val timelineT1 = metaClient.reloadActiveTimeline() - assertEquals(4, timelineT1.countInstants()) + assertEquals(5, timelineT1.countInstants()) assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT1.lastInstant().get().getAction) val t1 = timelineT1.lastInstant().get().getTimestamp @@ -312,8 +313,8 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn //Validate T2 val timelineT2 = metaClient.reloadActiveTimeline() - assertEquals(5, timelineT2.countInstants()) - assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT2.lastInstant().get().getAction) + assertEquals(6, timelineT2.getDeltaCommitTimeline.countInstants()) + assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT2.getDeltaCommitTimeline.lastInstant().get().getAction) val t2 = timelineT2.lastInstant().get().getTimestamp val filesT2 = getFiles(basePath) @@ -335,6 +336,8 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn //Update T3 val updatedRecords3 = dataGen.generateUpdates("003", updatedRecords2) val updatedRecords3DF = parseRecords(recordsToStrings(updatedRecords3).asScala.toSeq) + // Trigger compaction to demonstrate that table services on MDT work well. + combinedOpts += (HoodieWriteConfig.TABLE_SERVICES_ENABLED.key() -> "true") updatedRecords3DF.write.format(hudi) .options(combinedOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) @@ -343,9 +346,9 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn //Validate T3 val timelineT3 = metaClient.reloadActiveTimeline() - assertEquals(7, timelineT3.countInstants()) - assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, timelineT3.getInstants.get(5).getAction) - assertEquals(HoodieTimeline.COMMIT_ACTION, timelineT3.lastInstant().get().getAction) + assertEquals(7, timelineT3.getDeltaCommitTimeline.countInstants()) + val actionSet = Set(ActionType.commit.name()).asJava + assertTrue(timelineT3.getTimelineOfActions(actionSet).countInstants() > 0) val filesT3 = getFiles(basePath) assertEquals(12, filesT3.size) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala index 3507c10d04680..4fd77d7c8e94e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala @@ -87,9 +87,9 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { assertEquals(0, fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s => s.files).size) // not supported GreaterThan query - val reckey = latestSnapshotDf.limit(2).collect().map(row => row.getAs(colName).toString) + val reckey = latestSnapshotDf.collect().map(row => row.getAs(colName).toString).sorted dataFilter = GreaterThan(attribute(colName), Literal(reckey(0))) - assertTrue(fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s => s.files).size >= 3) + assertTrue(fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s => s.files).size >= 2) // not supported OR query dataFilter = Or(EqualTo(attribute(colName), Literal(reckey(0))), GreaterThanOrEqual(attribute("timestamp"), Literal(0))) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala index 1cef283eaa2d8..89e41da605aa8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala @@ -586,7 +586,8 @@ class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase { | hoodie.datasource.write.recordkey.field = 'id', | hoodie.metadata.record.index.enable = 'true', | hoodie.metadata.index.column.stats.enable = 'true', - | hoodie.metadata.index.column.stats.column.list = 'price' + | hoodie.metadata.index.column.stats.column.list = 'price', + | hoodie.metadata.index.partition.stats.enable = 'true' |) |location '${tmp.getCanonicalPath}/$tableName' |""".stripMargin @@ -628,7 +629,7 @@ class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase { val result7DF = spark.sql( s"select type, key, ColumnStatsMetadata from hudi_metadata('$identifier') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType}" ) - assert(result7DF.count() == 0) + assert(result7DF.count() == 3) } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala index d99002f5642ec..d15cc09bc4302 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala @@ -56,14 +56,14 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase { // collect active commits for table val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() - assertResult(4) { + assertResult(5) { commits.length } // collect archived commits for table val endTs = commits(0).get(0).toString val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect() - assertResult(3) { + assertResult(2) { archivedCommits.length } } @@ -106,14 +106,14 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase { // collect active commits for table val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() - assertResult(4) { + assertResult(5) { commits.length } // collect archived commits for table val endTs = commits(0).get(0).toString val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect() - assertResult(3) { + assertResult(2) { archivedCommits.length } }