diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 56ebe6a8ed1ce..66ff6b6e43e6d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.index; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; @@ -244,8 +245,11 @@ private static HoodieData> getExistingRecords( .filterCompletedInstants() .lastInstant() .map(HoodieInstant::getTimestamp); + Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); + boolean hasTimestampFields = AvroSchemaUtils.isLogicalTimestampRepairNeeded(hoodieTable.getStorageConf(), + () -> baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema)); return partitionLocations.flatMap(p - -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue())) + -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue()), hasTimestampFields) .getMergedRecords().iterator()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java index 8324034c4927f..28675c6779a8a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java @@ -56,26 +56,23 @@ public class HoodieMergedReadHandle extends HoodieReadHandle fileSliceOpt; - public HoodieMergedReadHandle(HoodieWriteConfig config, - Option instantTime, - HoodieTable hoodieTable, - Pair partitionPathFileIDPair) { - this(config, instantTime, hoodieTable, partitionPathFileIDPair, Option.empty()); + public HoodieMergedReadHandle(HoodieWriteConfig config, Option instantTime, + HoodieTable hoodieTable, Pair partitionPathFileIDPair, + boolean hasTimestampFields) { + this(config, instantTime, hoodieTable, partitionPathFileIDPair, hasTimestampFields, Option.empty()); } - public HoodieMergedReadHandle(HoodieWriteConfig config, - Option instantTime, - HoodieTable hoodieTable, - Pair partitionPathFileIDPair, - Option fileSliceOption) { + public HoodieMergedReadHandle(HoodieWriteConfig config, Option instantTime, + HoodieTable hoodieTable, Pair partitionPathFileIDPair, + boolean hasTimestampFields, Option fileSliceOption) { super(config, instantTime, hoodieTable, partitionPathFileIDPair); Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); // config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data. - baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); + this.baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice(); // Repair reader schema. // Assume writer schema should be correct. If not, no repair happens. - readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, baseFileReaderSchema); + readerSchema = hasTimestampFields ? AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, this.baseFileReaderSchema) : orignalReaderSchema; } public List> getMergedRecords() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 6289d652cca7b..cf9e988e0a8d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -18,6 +18,8 @@ package org.apache.hudi.metadata; +import org.apache.hudi.avro.AvroSchemaUtils; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; import org.apache.hudi.avro.model.HoodieIndexPlan; @@ -75,6 +77,7 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -584,8 +587,10 @@ private static HoodieData readRecordKeysFromFileSliceSnapshot(Hood final String partition = partitionAndFileSlice.getKey(); final FileSlice fileSlice = partitionAndFileSlice.getValue(); final String fileId = fileSlice.getFileId(); - return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), - Option.of(fileSlice)).getMergedRecords().stream().map(record -> { + Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField()); + boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema); + return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), hasTimestampFields, Option.of(fileSlice)) + .getMergedRecords().stream().map(record -> { HoodieRecord record1 = (HoodieRecord) record; return HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), partition, fileId, record1.getCurrentLocation().getInstantTime(), 0); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 284de408d2da1..59b0d3a1eb019 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -87,8 +87,12 @@ public void runMerge(HoodieTable table, HoodieFileReader bootstrapFileReader = null; Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); - Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); - + Schema readerSchema; + if (!table.isMetadataTable() && AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getStorageConf(), () -> true)) { + readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); + } else { + readerSchema = baseFileReader.getSchema(); + } // In case Advanced Schema Evolution is enabled we might need to rewrite currently // persisted records to adhere to an evolved schema diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 9defec99c38e2..bdcb40c0ffb58 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.compact; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; @@ -130,6 +131,12 @@ public HoodieData compact( TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier(); // if this is a MDT, set up the instant range of log reader just like regular MDT snapshot reader. Option instantRange = CompactHelpers.getInstance().getInstantRange(metaClient); + // Since we are using merge handle here, we can directly query the write schema from conf + // Write handle provides an option to use overridden write schema as well which is not used by merge handle + Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); + if (!table.isMetadataTable()) { + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> AvroSchemaUtils.hasTimestampMillisField(writerSchema)); + } return context.parallelize(operations).map(operation -> compact( compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper)) .flatMap(List::iterator); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index a52dd06ff4864..0321626e8a0fa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -128,10 +128,8 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema if (requestedSchema == null) { requestedSchema = readerSchema; } - // Set configuration for timestamp_millis type repair. - if (!storage.getConf().contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) { - storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); - } + // Set configuration for timestamp_millis type repair (only when not already set). + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(storage.getConf(), () -> AvroSchemaUtils.hasTimestampMillisField(readerSchema)); MessageType fileSchema = getFileSchema(); Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema); Option messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema)); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index f9b474eedd47a..68a58b706ac42 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy; import org.apache.hudi.client.utils.SparkValidatorUtils; @@ -31,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -43,6 +45,7 @@ import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.index.HoodieIndex; @@ -256,6 +259,22 @@ private HoodieData mapPartitionsAsRDD(HoodieData> d // Partition only partitionedRDD = mappedRDD.partitionBy(partitioner); } + + if (!table.isMetadataTable() && table.getMetaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 0) { + TableSchemaResolver schemaResolver = new TableSchemaResolver(table.getMetaClient()); + try { + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> { + try { + return AvroSchemaUtils.hasTimestampMillisField(schemaResolver.getTableAvroSchema()); + } catch (Exception e) { + return true; + } + }); + } catch (Exception e) { + throw new HoodieException("Failed to set logical ts related configs", e); + } + } + return HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition, recordItr) -> { if (WriteOperationType.isChangingRecords(operationType)) { return handleUpsertPartition(instantTime, partition, recordItr, partitioner); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java index fd0e878d482a8..cee0ceac97494 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java @@ -19,6 +19,8 @@ package org.apache.hudi.io; +import org.apache.hudi.avro.AvroSchemaUtils; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.AWSDmsAvroPayload; @@ -41,6 +43,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.params.ParameterizedTest; @@ -198,7 +201,10 @@ private void doMergedReadAndValidate(HoodieTableMetaClient metaClient, HoodieWri .collect(Collectors.toList()); assertEquals(1, partitionPathAndFileIDPairs.size()); String latestCommitTime = table.getActiveTimeline().lastInstant().get().getTimestamp(); - HoodieMergedReadHandle mergedReadHandle = new HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, partitionPathAndFileIDPairs.get(0)); + Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getWriteSchema()), writeConfig.allowOperationMetadataField()); + boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema); + HoodieMergedReadHandle mergedReadHandle = new HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, partitionPathAndFileIDPairs.get(0), + hasTimestampFields); List mergedRecords = mergedReadHandle.getMergedRecords(); assertEquals(totalRecords, mergedRecords.size()); List sortedMergedRecords = mergedRecords.stream() diff --git a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java index 78908a40ae52e..8546b1b8a5e3a 100644 --- a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java +++ b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.AvroSchemaCache; import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.exception.HoodieException; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 060a99422d636..3ef4f151645f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -28,9 +28,12 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.TableChanges; import org.apache.hudi.internal.schema.utils.SchemaChangeUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; +import org.apache.hadoop.conf.Configuration; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -42,6 +45,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.hudi.common.util.CollectionUtils.reduce; @@ -531,4 +535,34 @@ public static boolean hasTimestampMillisField(Schema schema) { return false; } } + + /** + * Sets logical timestamp repair needed key in conf to true + */ + public static void setLogicalTimestampRepairIfNotSet(Configuration conf, Supplier valueSupplier) { + if (conf.get(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR) == null) { + conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, valueSupplier.get().toString()); + } + } + + /** + * Sets logical timestamp repair needed key in conf to true + */ + public static void setLogicalTimestampRepairIfNotSet(StorageConfiguration conf, Supplier valueSupplier) { + if (!conf.contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) { + conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, valueSupplier.get().toString()); + } + } + + /** + * Returns true if logical timestamp repair needed key is set to true or if it is not present in config + */ + public static boolean isLogicalTimestampRepairNeeded(StorageConfiguration conf, Supplier defaultValueSupplier) { + Option valueOpt = conf.getString(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR); + if (valueOpt.isEmpty() || StringUtils.isNullOrEmpty(valueOpt.get())) { + return defaultValueSupplier.get(); + } else { + return Boolean.parseBoolean(valueOpt.get()); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index d62a25b1e1add..dc610a16758d0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -43,7 +43,6 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; -import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; @@ -70,7 +69,6 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME; -import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -186,8 +184,8 @@ protected AbstractHoodieLogRecordReader(HoodieStorage storage, String basePath, this.forceFullScan = forceFullScan; this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; - this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() && storage.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, - () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); + this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() + && AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); if (keyFieldOverride.isPresent()) { // NOTE: This branch specifically is leveraged handling Metadata Table diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 44fc5ec6fd1fe..f8e873d142395 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -39,7 +39,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.io.SeekableDataInputStream; -import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.util.IOUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; @@ -100,7 +99,7 @@ public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, Schema boolean enableRecordLookups, String keyField) throws IOException { this(storage, logFile, readerSchema, bufferSize, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema(), - storage.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema))); } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java index fcd851efd47f1..f179167c41a54 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.hadoop; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; @@ -172,7 +173,12 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema // sure that in case the file-schema is not equal to read-schema we'd still // be able to read that file (in case projection is a proper one) Configuration hadoopConf = storage.getConf().unwrapCopyAs(Configuration.class); - Schema repairedFileSchema = getRepairedSchema(getSchema(), schema); + Schema repairedFileSchema; + if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> true)) { + repairedFileSchema = getRepairedSchema(getSchema(), schema); + } else { + repairedFileSchema = schema; + } Option promotedSchema = Option.empty(); if (!renamedColumns.isEmpty() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema, schema)) { AvroReadSupport.setAvroReadSchema(hadoopConf, repairedFileSchema); @@ -183,7 +189,8 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema AvroReadSupport.setRequestedProjection(hadoopConf, schema); } ParquetReader reader = - new HoodieAvroParquetReaderBuilder(path) + new HoodieAvroParquetReaderBuilder(path, + AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> schema == null || AvroSchemaUtils.hasTimestampMillisField(schema))) .withTableSchema(schema) .withConf(hadoopConf) .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS)) diff --git a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java index 308fe34a726cd..cb699155e9bb4 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java +++ b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java @@ -18,7 +18,6 @@ package org.apache.parquet.avro; -import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.storage.StoragePath; @@ -42,11 +41,13 @@ public class HoodieAvroParquetReaderBuilder extends ParquetReader.Builder private GenericData model = null; private boolean enableCompatibility = true; private boolean isReflect = true; + private boolean isLogicalTimestampRepairNeeded; private Schema tableSchema = null; @Deprecated - public HoodieAvroParquetReaderBuilder(StoragePath path) { + public HoodieAvroParquetReaderBuilder(StoragePath path, boolean isLogicalTimestampRepairNeeded) { super(new Path(path.toUri())); + this.isLogicalTimestampRepairNeeded = isLogicalTimestampRepairNeeded; } public HoodieAvroParquetReaderBuilder(InputFile file) { @@ -88,6 +89,6 @@ protected ReadSupport getReadSupport() { conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility); } return new HoodieAvroReadSupport<>(model, Option.ofNullable(tableSchema).map(schema -> getAvroSchemaConverter(conf).convert(schema)), - tableSchema == null || AvroSchemaUtils.hasTimestampMillisField(tableSchema)); + isLogicalTimestampRepairNeeded); } } diff --git a/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java b/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java index b85b3b98fac38..8a4b53e6ab6ea 100644 --- a/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java +++ b/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java @@ -20,6 +20,7 @@ package org.apache.parquet.schema; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import java.util.ArrayList; import java.util.List; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 0534f3dac8ae7..3b27bc165b876 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -116,6 +116,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig + protected lazy val hasTimestampMillisFieldInTableSchema: Boolean = HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema) + protected lazy val basePath: Path = new Path(metaClient.getBasePath.toUri) // NOTE: Record key-field is assumed singular here due to the either of @@ -245,7 +247,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // We're delegating to Spark to append partition values to every row only in cases // when these corresponding partition-values are not persisted w/in the data file itself val parquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat( - shouldExtractPartitionValuesFromPartitionPath, tableAvroSchema, hasTimestampMillisFieldInTableSchema = HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema)).get + shouldExtractPartitionValuesFromPartitionPath, tableAvroSchema, hasTimestampMillisFieldInTableSchema = hasTimestampMillisFieldInTableSchema).get (parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID) } @@ -555,7 +557,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // when these corresponding partition-values are not persisted w/in the data file itself appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath), tableAvroSchema, - hasTimestampMillisFieldInTableSchema = HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema) + hasTimestampMillisFieldInTableSchema = hasTimestampMillisFieldInTableSchema ) // Since partition values by default are omitted, and not persisted w/in data-files by Spark, // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index db538f110c90b..5f78002ddddae 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -23,9 +23,11 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} import org.apache.hudi.HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible +import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -85,7 +87,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD { protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) - private val hadoopConfBroadcast = sc.broadcast(new SerializableWritable(config)) override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 527a7c6c5f514..3731a7b5e0c70 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -17,6 +17,9 @@ package org.apache.hudi +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead @@ -38,9 +41,10 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath} import org.apache.hudi.table.HoodieSparkTable - import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.GlobPattern +import org.apache.hudi.util.JFunction import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat @@ -205,16 +209,15 @@ class IncrementalRelation(val sqlContext: SQLContext, // pass internalSchema to hadoopConf, so it can be used in executors. val validCommits = metaClient .getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",") - sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) - sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath.toString) - sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) + val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) + conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath.toString) + conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) // Pass table Avro schema to hadoopConf so supportBatch() can find it (supportBatch does not receive options). if (tableAvroSchema.getType != Schema.Type.NULL) { - LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf( - sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema) - sqlContext.sparkContext.hadoopConfiguration.set( - HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, - AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString) + LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf(conf, tableAvroSchema) + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(conf, + JFunction.toJavaSupplier(() => AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).asInstanceOf[java.lang.Boolean])) } val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID @@ -255,7 +258,7 @@ class IncrementalRelation(val sqlContext: SQLContext, val timer = HoodieTimer.start val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths - val storageConf = HadoopFSUtils.getStorageConfWithCopy(sqlContext.sparkContext.hadoopConfiguration) + val storageConf = HadoopFSUtils.getStorageConfWithCopy(conf) val localBasePathStr = basePath.toString val firstNotFoundPath = sqlContext.sparkContext.parallelize(allFilesToCheck.toSeq, allFilesToCheck.size) .map(path => { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 4d83a4d5288d4..c340a5507ea81 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -17,9 +17,10 @@ package org.apache.hudi -import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path} +import org.apache.hadoop.fs.GlobPattern import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling +import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME @@ -29,10 +30,10 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths import org.apache.hudi.storage.StoragePathInfo - -import org.apache.hadoop.fs.GlobPattern +import org.apache.hudi.util.JFunction import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -81,6 +82,9 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + if (!metaClient.isMetadataTable) { + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) + } new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 5b6be9c55857b..deb3cdf12a4bf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -22,10 +22,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile, isProjectionCompatible} -import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, OverwriteWithLatestAvroPayload} +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.storage.StoragePath +import org.apache.hudi.util.JFunction import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext @@ -115,6 +117,9 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + if (!metaClient.isMetadataTable) { + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) + } new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index ab73ded838e77..df7c5ead29ee7 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -24,6 +24,7 @@ import org.apache.hudi.storage.HoodieStorage import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration +import org.apache.hudi.util.JFunction import org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter import org.apache.parquet.schema.{MessageType, SchemaRepair, Type} import org.apache.spark.sql.avro._ @@ -82,8 +83,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter { override def getParquetReadSupport(storage: HoodieStorage, messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { - val enableTimestampFieldRepair = storage.getConf.getBoolean( - HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, JFunction.toJavaSupplier(() => true)) new HoodieParquetReadSupport( Option.empty[ZoneId], enableVectorizedReader = true, @@ -169,7 +169,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter { val cachedRequestedSchema = HoodieInternalRowUtils.getCachedSchema(nonNullRequestedSchema) val requestedSchemaInMessageType = org.apache.hudi.common.util.Option.of( getAvroSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(nonNullRequestedSchema)) - val enableTimestampFieldRepair = storage.getConf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, JFunction.toJavaSupplier(() => true)) val repairedRequestedSchema = repairSchemaIfSpecified(enableTimestampFieldRepair, fileSchema, requestedSchemaInMessageType) val repairedRequestedStructType = new ParquetToSparkSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(repairedRequestedSchema) val evolution = new SparkBasicSchemaEvolution(repairedRequestedStructType, cachedRequestedSchema, SQLConf.get.sessionLocalTimeZone) diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala index 11dcec7af06b9..75c874580b405 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala @@ -377,7 +377,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, - enableTimestampFieldRepair = true, + enableTimestampFieldRepair = hasTimestampMillisFieldInTableSchema, datetimeRebaseSpec, int96RebaseSpec, tableSchemaAsMessageType) diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala index 61aa3f1a915b3..a8aac2d4f2911 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala @@ -21,6 +21,7 @@ import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.{Spark35HoodieFileScanRDD, SparkAdapterSupport$} import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.storage.HoodieStorage +import org.apache.hudi.util.JFunction import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration @@ -139,8 +140,7 @@ class Spark3_5Adapter extends BaseSpark3Adapter { override def getParquetReadSupport(storage: HoodieStorage, messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { - val enableTimestampFieldRepair = storage.getConf.getBoolean( - HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, JFunction.toJavaSupplier(() => true)) new HoodieParquetReadSupport( Option.empty[ZoneId], enableVectorizedReader = true, @@ -166,7 +166,7 @@ class Spark3_5Adapter extends BaseSpark3Adapter { val nonNullRequestedSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema) val cachedRequestedSchema = HoodieInternalRowUtils.getCachedSchema(nonNullRequestedSchema) val requestedSchemaInMessageType = org.apache.hudi.common.util.Option.of(getAvroSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(nonNullRequestedSchema)) - val enableTimestampFieldRepair = storage.getConf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, JFunction.toJavaSupplier(() => true)) val repairedRequestedSchema = repairSchemaIfSpecified(enableTimestampFieldRepair, fileSchema, requestedSchemaInMessageType) val repairedRequestedStructType = new ParquetToSparkSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(repairedRequestedSchema) val evolution = new SparkBasicSchemaEvolution(repairedRequestedStructType, cachedRequestedSchema, SQLConf.get.sessionLocalTimeZone) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 5f2d81d73a801..ba54be6de3570 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -788,8 +788,8 @@ private void assertBoundaryCounts(Dataset df, String exprZero, String exprT } @ParameterizedTest - @CsvSource(value = {"SIX,AVRO,CLUSTER", "CURRENT,AVRO,NONE", "CURRENT,AVRO,CLUSTER", "CURRENT,SPARK,NONE", "CURRENT,SPARK,CLUSTER"}) - void testCOWLogicalRepair(String tableVersion, String recordType, String operation) throws Exception { + @CsvSource(value = {"CLUSTER,AVRO", "NONE,AVRO", "CLUSTER,SPARK", "NONE,SPARK"}) + void testCOWLogicalRepair(String operation, String recordType) throws Exception { timestampNTZCompatibility(() -> { try { String dirName = "trips_logical_types_json_cow_write"; @@ -813,6 +813,11 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati properties.setProperty("hoodie.metatata.enable", "true"); properties.setProperty("hoodie.parquet.small.file.limit", "-1"); properties.setProperty("hoodie.cleaner.commits.retained", "10"); + if (recordType.equals("SPARK")) { + properties.setProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger"); + } else { + properties.setProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.common.model.HoodieAvroRecordMerger"); + } Option propt = Option.of(properties); new HoodieStreamer(prepCfgForCowLogicalRepair(tableBasePath, "456"), jsc, propt).sync(); @@ -864,17 +869,14 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati @ParameterizedTest @CsvSource(value = { - "SIX,AVRO,CLUSTER,AVRO", - "CURRENT,AVRO,NONE,AVRO", - "CURRENT,AVRO,CLUSTER,AVRO", - "CURRENT,AVRO,COMPACT,AVRO", - "CURRENT,AVRO,NONE,PARQUET", - "CURRENT,AVRO,CLUSTER,PARQUET", - "CURRENT,AVRO,COMPACT,PARQUET", - "CURRENT,SPARK,NONE,PARQUET", - "CURRENT,SPARK,CLUSTER,PARQUET", - "CURRENT,SPARK,COMPACT,PARQUET"}) - void testMORLogicalRepair(String tableVersion, String recordType, String operation, String logBlockType) throws Exception { + "CLUSTER,AVRO", + "NONE,AVRO", + "COMPACT,AVRO", + "NONE,PARQUET", + "CLUSTER,PARQUET", + "COMPACT,PARQUET" + }) + void testMORLogicalRepair(String operation, String logBlockType) throws Exception { timestampNTZCompatibility(() -> { try { String tableSuffix; @@ -909,8 +911,6 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati properties.setProperty("hoodie.streamer.schemaprovider.target.schema.file", schemaPath); String inputDataPath = getClass().getClassLoader().getResource("logical-repair/mor_write_updates/5").toURI().toString(); properties.setProperty("hoodie.streamer.source.dfs.root", inputDataPath); - String mergerClass = getMergerClassForRecordType(recordType); - String tableVersionString = getTableVersionCode(tableVersion); properties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); properties.setProperty("hoodie.datasource.write.precombine.field", "timestamp");