Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ public void testMetadataTableCompactionWithPendingInstants() throws Exception {
doWriteOperation(testTable, metaClient.createNewInstantTime());
HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
// verify that compaction of metadata table does not kick in.
assertFalse(tableMetadata.getLatestCompactionTime().isPresent());
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());

// write some commits to trigger the MDT compaction
doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
Expand Down Expand Up @@ -1723,7 +1723,7 @@ public void testMetadataMultiWriter() throws Exception {

// Ensure all commits were synced to the Metadata Table
HoodieTableMetaClient metadataMetaClient = createMetaClientForMetadataTable();
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 5);
assertEquals(6, metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants());
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000002")));
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000003")));
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004")));
Expand Down Expand Up @@ -1775,7 +1775,7 @@ public void testMultiWriterForDoubleLocking() throws Exception {
LOG.warn("total commits in metadata table " + metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());

// 6 commits and 2 cleaner commits.
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8);
assertEquals(9, metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants());
assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1);
// Validation
validateMetadata(writeClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ public void testVirtualKeysInBaseFiles() throws Exception {
*/
@Test
public void testMetadataTableCompactionWithPendingInstants() throws Exception {
init(COPY_ON_WRITE, false);
init(COPY_ON_WRITE, true);
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
Expand All @@ -902,7 +902,7 @@ public void testMetadataTableCompactionWithPendingInstants() throws Exception {
doWriteOperation(testTable, metaClient.createNewInstantTime());
HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
// verify that compaction of metadata table does not kick in.
assertFalse(tableMetadata.getLatestCompactionTime().isPresent());
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());

// write some commits to trigger the MDT compaction
doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
Expand Down Expand Up @@ -1112,7 +1112,7 @@ public void testMetadataRollbackWithCompaction() throws Exception {
@Test
public void testMetadataRollbackDuringInit() throws Exception {
HoodieTableType tableType = COPY_ON_WRITE;
init(tableType, false);
init(tableType, true);
writeConfig = getWriteConfigBuilder(false, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
Expand Down Expand Up @@ -1163,9 +1163,9 @@ private void revertTableToInflightState(HoodieWriteConfig writeConfig) throws IO
HoodieActiveTimeline mdtTimeline = mdtMetaClient.getActiveTimeline();
assertEquals(1, timeline.countInstants());
assertEquals(1, timeline.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(3, mdtTimeline.countInstants());
assertEquals(3, mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
String mdtInitCommit2 = mdtTimeline.getCommitsTimeline().filterCompletedInstants().getInstants().get(1).getTimestamp();
assertEquals(4, mdtTimeline.countInstants());
assertEquals(4, mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
String mdtInitCommit2 = mdtTimeline.getCommitsTimeline().filterCompletedInstants().getInstants().get(2).getTimestamp();
Pair<HoodieInstant, HoodieCommitMetadata> lastCommitMetadataWithValidData =
mdtTimeline.getLastCommitMetadataWithValidData().get();
String commit = lastCommitMetadataWithValidData.getLeft().getTimestamp();
Expand Down Expand Up @@ -2215,7 +2215,7 @@ public void testMetadataMultiWriter() throws Exception {

// Ensure all commits were synced to the Metadata Table
HoodieTableMetaClient metadataMetaClient = createMetaClient(metadataTableBasePath);
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 5);
assertEquals(6, metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants());
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000002")));
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000003")));
assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004")));
Expand Down Expand Up @@ -2267,8 +2267,8 @@ public void testMultiWriterForDoubleLocking() throws Exception {
HoodieTableMetaClient metadataMetaClient = createMetaClient(metadataTableBasePath);
LOG.warn("total commits in metadata table " + metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());

// 6 commits and 2 cleaner commits.
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8);
// 7 commits and 2 cleaner commits.
assertEquals(9, metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants());
assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1);
// Validation
validateMetadata(writeClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata)
.withMetadataIndexPartitionStats(false)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable).build())
.withWriteConcurrencyMode(writeConcurrencyMode)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
Expand Down Expand Up @@ -1645,6 +1646,7 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception {
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMetadataIndexPartitionStats(false)
.withMaxNumDeltaCommitsBeforeCompaction(8)
.build())
.forTable("test-trip-table").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty
.key(METADATA_PREFIX + ".index.partition.stats.enable")
.defaultValue(false)
.defaultValue(true)
.sinceVersion("1.0.0")
.withDocumentation("Enable aggregating stats for each column at the storage partition level.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ private List<String> getPartitionPathWithPathPrefixUsingFilterExpression(String
} else if (!path.getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
return Pair.of(Option.empty(), Option.of(path));
}
} else if (path.getName()
.startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
} else if (path.getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)
|| MetadataPartitionType.isValidPartitionFilePath(path.getName())) {
String partitionName =
FSUtils.getRelativePartitionPath(dataBasePath,
path.getParent());
Expand All @@ -212,7 +212,7 @@ private List<String> getPartitionPathWithPathPrefixUsingFilterExpression(String
|| (Boolean) fullBoundExpr.eval(
extractPartitionValues(partitionFields, relativePartitionPath,
urlEncodePartitioningEnabled)))
.collect(Collectors.toList()));
.collect(Collectors.toSet()));

Expression partialBoundExpr;
// If partitionPaths is nonEmpty, we're already at the last path level, and all paths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2174,21 +2174,14 @@ public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataCo
return newColumnStats;
}

Comparable minValue =
(Comparable) Stream.of(
(Comparable) unwrapAvroValueWrapper(prevColumnStats.getMinValue()),
(Comparable) unwrapAvroValueWrapper(newColumnStats.getMinValue()))
.filter(Objects::nonNull)
.min(Comparator.naturalOrder())
.orElse(null);

Comparable maxValue =
(Comparable) Stream.of(
(Comparable) unwrapAvroValueWrapper(prevColumnStats.getMaxValue()),
(Comparable) unwrapAvroValueWrapper(newColumnStats.getMaxValue()))
.filter(Objects::nonNull)
.max(Comparator.naturalOrder())
.orElse(null);
Comparable minValue = castAndCompare(
unwrapAvroValueWrapper(newColumnStats.getMinValue()),
unwrapAvroValueWrapper(prevColumnStats.getMinValue()),
true);
Comparable maxValue = castAndCompare(
unwrapAvroValueWrapper(prevColumnStats.getMaxValue()),
unwrapAvroValueWrapper(newColumnStats.getMaxValue()),
false);

return HoodieMetadataColumnStats.newBuilder(HoodieMetadataPayload.METADATA_COLUMN_STATS_BUILDER_STUB.get())
.setFileName(newColumnStats.getFileName())
Expand All @@ -2203,6 +2196,30 @@ public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataCo
.build();
}

public static Comparable castAndCompare(Comparable newVal, Comparable prevVal, boolean isMin) {
if (newVal == null) {
return prevVal;
} else if (prevVal == null) {
return newVal;
}

Comparable picked;
// Same type.
if (newVal.getClass() == prevVal.getClass()) {
picked = newVal.compareTo(prevVal) < 0 ? newVal : prevVal;
} else if (newVal instanceof Number && prevVal instanceof Number) {
picked = Double.compare(((Number) newVal).doubleValue(), ((Number) prevVal).doubleValue()) < 0 ? newVal : prevVal;
} else {
picked = newVal.toString().compareTo(prevVal.toString()) < 0 ? newVal : prevVal;
}

if (isMin) {
return picked;
} else {
return picked == newVal ? prevVal : newVal;
}
}

public static Map<String, HoodieMetadataFileInfo> combineFileSystemMetadata(HoodieMetadataPayload older, HoodieMetadataPayload newer) {
Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
// First, add all files listed in the previous record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ private static void constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
String.format("Valid %s record expected for type: %s", SCHEMA_FIELD_ID_COLUMN_STATS, MetadataPartitionType.COLUMN_STATS.getRecordType()));
} else {
payload.columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
.setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
.setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
.setFileName((String)columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
.setColumnName((String)columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
// AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
// This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484.
// We should avoid using GenericRecord and convert GenericRecord into a serializable type.
Expand Down Expand Up @@ -434,6 +434,16 @@ public static MetadataPartitionType fromPartitionPath(String partitionPath) {
throw new IllegalArgumentException("No MetadataPartitionType for partition path: " + partitionPath);
}

public static boolean isValidPartitionFilePath(String partitionFilePath) {
partitionFilePath = partitionFilePath.startsWith(".") ? partitionFilePath.substring(1) : partitionFilePath;
for (MetadataPartitionType partitionType : getValidValues()) {
if (partitionFilePath.startsWith(partitionType.fileIdPrefix)) {
return true;
}
}
return false;
}

@Override
public String toString() {
return "Metadata partition {"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,19 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType
case FUNCTIONAL_INDEX:
case SECONDARY_INDEX:
metadataConfigBuilder.enable(true);
expectedEnabledPartitions = 1;
expectedEnabledPartitions = 2;
break;
case COLUMN_STATS:
metadataConfigBuilder.enable(true).withMetadataIndexColumnStats(true);
expectedEnabledPartitions = 2;
expectedEnabledPartitions = 3;
break;
case BLOOM_FILTERS:
metadataConfigBuilder.enable(true).withMetadataIndexBloomFilter(true);
expectedEnabledPartitions = 2;
expectedEnabledPartitions = 3;
break;
case RECORD_INDEX:
metadataConfigBuilder.enable(true).withEnableRecordIndex(true);
expectedEnabledPartitions = 2;
expectedEnabledPartitions = 3;
break;
case PARTITION_STATS:
metadataConfigBuilder.enable(true).withMetadataIndexPartitionStats(true).withColumnStatsIndexForColumns("partitionCol");
Expand All @@ -91,7 +91,7 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType

// Verify partition type is enabled due to config
if (partitionType == MetadataPartitionType.FUNCTIONAL_INDEX || partitionType == MetadataPartitionType.SECONDARY_INDEX) {
assertEquals(1, enabledPartitions.size(), "FUNCTIONAL_INDEX or SECONDARY_INDEX should be enabled by SQL, only FILES is enabled in this case.");
assertEquals(2, enabledPartitions.size(), "FUNCTIONAL_INDEX or SECONDARY_INDEX should be enabled by SQL, only FILES is enabled in this case.");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES));
} else {
assertEquals(expectedEnabledPartitions, enabledPartitions.size());
Expand All @@ -114,7 +114,7 @@ public void testPartitionAvailableByMetaClientOnly() {
List<MetadataPartitionType> enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient);

// Verify RECORD_INDEX and FILES is enabled due to availability
assertEquals(2, enabledPartitions.size(), "RECORD_INDEX and FILES should be available");
assertEquals(3, enabledPartitions.size(), "RECORD_INDEX, FILES, and PARTITION_STATS should be available; ");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.RECORD_INDEX), "RECORD_INDEX should be enabled by availability");
}
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testFunctionalIndexPartitionEnabled() {
List<MetadataPartitionType> enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient);

// Verify FUNCTIONAL_INDEX and FILES is enabled due to availability
assertEquals(2, enabledPartitions.size(), "FUNCTIONAL_INDEX and FILES should be available");
assertEquals(3, enabledPartitions.size(), "FUNCTIONAL_INDEX, FILES and PARTITION_STATS should be available");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FUNCTIONAL_INDEX), "FUNCTIONAL_INDEX should be enabled by availability");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType
synchronized (primitiveType.stringifier()) {
// Date logical type is implemented as a signed INT32
// REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
return java.sql.Date.valueOf(
return java.time.LocalDate.parse(
primitiveType.stringifier().stringify((Integer) val)
);
}
Expand Down
Loading