From 551a7a2df29199284f0663f80d82c9895486ee69 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 2 Apr 2026 16:13:41 +0530 Subject: [PATCH 01/11] Add optimisation for merged read handle --- .../apache/hudi/index/HoodieIndexUtils.java | 5 ++++- .../hudi/io/HoodieMergedReadHandle.java | 20 +++++++++---------- .../HoodieBackedTableMetadataWriter.java | 9 +++++++-- .../hudi/io/TestHoodieMergedReadHandle.java | 8 +++++++- 4 files changed, 27 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 56ebe6a8ed1ce..ce0ec0bdbff80 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.index; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; @@ -244,8 +245,10 @@ private static HoodieData> getExistingRecords( .filterCompletedInstants() .lastInstant() .map(HoodieInstant::getTimestamp); + Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); + boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema); return partitionLocations.flatMap(p - -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue())) + -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue()), baseFileReaderSchema, hasTimestampFields) .getMergedRecords().iterator()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java index 8324034c4927f..9041fe49a365f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java @@ -56,26 +56,24 @@ public class HoodieMergedReadHandle extends HoodieReadHandle fileSliceOpt; - public HoodieMergedReadHandle(HoodieWriteConfig config, - Option instantTime, - HoodieTable hoodieTable, - Pair partitionPathFileIDPair) { - this(config, instantTime, hoodieTable, partitionPathFileIDPair, Option.empty()); + public HoodieMergedReadHandle(HoodieWriteConfig config, Option instantTime, + HoodieTable hoodieTable, Pair partitionPathFileIDPair, + Schema baseFileReaderSchema, boolean hasTimestampFields) { + this(config, instantTime, hoodieTable, partitionPathFileIDPair, baseFileReaderSchema, hasTimestampFields, Option.empty()); } - public HoodieMergedReadHandle(HoodieWriteConfig config, - Option instantTime, - HoodieTable hoodieTable, - Pair partitionPathFileIDPair, + public HoodieMergedReadHandle(HoodieWriteConfig config, Option instantTime, + HoodieTable hoodieTable, Pair partitionPathFileIDPair, + Schema baseFileReaderSchema, boolean hasTimestampFields, Option fileSliceOption) { super(config, instantTime, hoodieTable, partitionPathFileIDPair); Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); // config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data. - baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); + this.baseFileReaderSchema = baseFileReaderSchema; fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice(); // Repair reader schema. // Assume writer schema should be correct. If not, no repair happens. - readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, baseFileReaderSchema); + readerSchema = hasTimestampFields ? AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, this.baseFileReaderSchema) : orignalReaderSchema; } public List> getMergedRecords() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 6289d652cca7b..d1b0b39d0f438 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -18,6 +18,8 @@ package org.apache.hudi.metadata; +import org.apache.hudi.avro.AvroSchemaUtils; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; import org.apache.hudi.avro.model.HoodieIndexPlan; @@ -75,6 +77,7 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -584,8 +587,10 @@ private static HoodieData readRecordKeysFromFileSliceSnapshot(Hood final String partition = partitionAndFileSlice.getKey(); final FileSlice fileSlice = partitionAndFileSlice.getValue(); final String fileId = fileSlice.getFileId(); - return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), - Option.of(fileSlice)).getMergedRecords().stream().map(record -> { + Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField()); + boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema); + return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), baseFileReaderSchema, hasTimestampFields, Option.of(fileSlice)) + .getMergedRecords().stream().map(record -> { HoodieRecord record1 = (HoodieRecord) record; return HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), partition, fileId, record1.getCurrentLocation().getInstantTime(), 0); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java index fd0e878d482a8..90564d580bddf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java @@ -19,6 +19,8 @@ package org.apache.hudi.io; +import org.apache.hudi.avro.AvroSchemaUtils; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.AWSDmsAvroPayload; @@ -41,6 +43,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.params.ParameterizedTest; @@ -198,7 +201,10 @@ private void doMergedReadAndValidate(HoodieTableMetaClient metaClient, HoodieWri .collect(Collectors.toList()); assertEquals(1, partitionPathAndFileIDPairs.size()); String latestCommitTime = table.getActiveTimeline().lastInstant().get().getTimestamp(); - HoodieMergedReadHandle mergedReadHandle = new HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, partitionPathAndFileIDPairs.get(0)); + Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getWriteSchema()), writeConfig.allowOperationMetadataField()); + boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema); + HoodieMergedReadHandle mergedReadHandle = new HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, partitionPathAndFileIDPairs.get(0), + baseFileReaderSchema, hasTimestampFields); List mergedRecords = mergedReadHandle.getMergedRecords(); assertEquals(totalRecords, mergedRecords.size()); List sortedMergedRecords = mergedRecords.stream() From 94da78aed5b5235712b20dc032068ac305092bfb Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Wed, 8 Apr 2026 17:24:44 +0530 Subject: [PATCH 02/11] Address review comments --- .../action/commit/HoodieMergeHelper.java | 9 +++++-- .../table/action/compact/HoodieCompactor.java | 7 ++++++ .../org/apache/hudi/avro/AvroSchemaUtils.java | 17 +++++++++++++ .../io/hadoop/HoodieAvroParquetReader.java | 11 +++++++-- .../avro/HoodieAvroParquetReaderBuilder.java | 7 +++--- .../TestHoodieDeltaStreamer.java | 24 +++++++------------ 6 files changed, 53 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 284de408d2da1..d83d9296fa454 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -80,6 +80,7 @@ public void runMerge(HoodieTable table, HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType(); + AvroSchemaUtils.setLogicalTimestampRepairNeeded(table.getStorageConf()); HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory( table.getStorage().newInstance(mergeHandle.getOldFilePath(), table.getStorageConf().newInstance())) .getReaderFactory(recordType) @@ -87,8 +88,12 @@ public void runMerge(HoodieTable table, HoodieFileReader bootstrapFileReader = null; Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); - Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); - + Schema readerSchema; + if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getStorageConf(), true)) { + readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); + } else { + readerSchema = baseFileReader.getSchema(); + } // In case Advanced Schema Evolution is enabled we might need to rewrite currently // persisted records to adhere to an evolved schema diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 9defec99c38e2..de0059ba95be6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.compact; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; @@ -130,6 +131,12 @@ public HoodieData compact( TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier(); // if this is a MDT, set up the instant range of log reader just like regular MDT snapshot reader. Option instantRange = CompactHelpers.getInstance().getInstantRange(metaClient); + // Since we are using merge handle here, we can directly query the write schema from conf + // Write handle provides an option to use overridden write schema as well which is not used by merge handle + Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); + if (AvroSchemaUtils.hasTimestampMillisField(writerSchema)) { + AvroSchemaUtils.setLogicalTimestampRepairNeeded(table.getStorageConf()); + } return context.parallelize(operations).map(operation -> compact( compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper)) .flatMap(List::iterator); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 060a99422d636..1b2870f010b51 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -28,6 +28,7 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.TableChanges; import org.apache.hudi.internal.schema.utils.SchemaChangeUtils; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; @@ -64,6 +65,8 @@ public class AvroSchemaUtils { AVRO_SCHEMA_REPAIR_CLASS = clazz; } + private static final String NEEDS_LOGICAL_TIMSTAMP_REPAIR_KEY = "logical.timestamp.repair.required"; + private AvroSchemaUtils() {} /** @@ -531,4 +534,18 @@ public static boolean hasTimestampMillisField(Schema schema) { return false; } } + + /** + * Sets logical timestamp repair needed key in conf to true + */ + public static void setLogicalTimestampRepairNeeded(StorageConfiguration conf) { + conf.set(NEEDS_LOGICAL_TIMSTAMP_REPAIR_KEY, Boolean.TRUE.toString()); + } + + /** + * Returns true if logical timestamp repair needed key is set to true or if it is not present in config + */ + public static boolean isLogicalTimestampRepairNeeded(StorageConfiguration storageConf, boolean defaultValue) { + return storageConf.getBoolean(NEEDS_LOGICAL_TIMSTAMP_REPAIR_KEY, defaultValue); + } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java index fcd851efd47f1..e14b99c0f68f9 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.hadoop; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; @@ -172,7 +173,12 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema // sure that in case the file-schema is not equal to read-schema we'd still // be able to read that file (in case projection is a proper one) Configuration hadoopConf = storage.getConf().unwrapCopyAs(Configuration.class); - Schema repairedFileSchema = getRepairedSchema(getSchema(), schema); + Schema repairedFileSchema; + if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), true)) { + repairedFileSchema = getRepairedSchema(getSchema(), schema); + } else { + repairedFileSchema = schema; + } Option promotedSchema = Option.empty(); if (!renamedColumns.isEmpty() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema, schema)) { AvroReadSupport.setAvroReadSchema(hadoopConf, repairedFileSchema); @@ -183,7 +189,8 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema AvroReadSupport.setRequestedProjection(hadoopConf, schema); } ParquetReader reader = - new HoodieAvroParquetReaderBuilder(path) + new HoodieAvroParquetReaderBuilder(path, + AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), false) || schema == null || AvroSchemaUtils.hasTimestampMillisField(schema)) .withTableSchema(schema) .withConf(hadoopConf) .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS)) diff --git a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java index 308fe34a726cd..cb699155e9bb4 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java +++ b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java @@ -18,7 +18,6 @@ package org.apache.parquet.avro; -import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.storage.StoragePath; @@ -42,11 +41,13 @@ public class HoodieAvroParquetReaderBuilder extends ParquetReader.Builder private GenericData model = null; private boolean enableCompatibility = true; private boolean isReflect = true; + private boolean isLogicalTimestampRepairNeeded; private Schema tableSchema = null; @Deprecated - public HoodieAvroParquetReaderBuilder(StoragePath path) { + public HoodieAvroParquetReaderBuilder(StoragePath path, boolean isLogicalTimestampRepairNeeded) { super(new Path(path.toUri())); + this.isLogicalTimestampRepairNeeded = isLogicalTimestampRepairNeeded; } public HoodieAvroParquetReaderBuilder(InputFile file) { @@ -88,6 +89,6 @@ protected ReadSupport getReadSupport() { conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility); } return new HoodieAvroReadSupport<>(model, Option.ofNullable(tableSchema).map(schema -> getAvroSchemaConverter(conf).convert(schema)), - tableSchema == null || AvroSchemaUtils.hasTimestampMillisField(tableSchema)); + isLogicalTimestampRepairNeeded); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 5f2d81d73a801..b4659fa44e94f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -788,8 +788,8 @@ private void assertBoundaryCounts(Dataset df, String exprZero, String exprT } @ParameterizedTest - @CsvSource(value = {"SIX,AVRO,CLUSTER", "CURRENT,AVRO,NONE", "CURRENT,AVRO,CLUSTER", "CURRENT,SPARK,NONE", "CURRENT,SPARK,CLUSTER"}) - void testCOWLogicalRepair(String tableVersion, String recordType, String operation) throws Exception { + @CsvSource(value = {"CLUSTER", "NONE"}) + void testCOWLogicalRepair(String operation) throws Exception { timestampNTZCompatibility(() -> { try { String dirName = "trips_logical_types_json_cow_write"; @@ -864,17 +864,13 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati @ParameterizedTest @CsvSource(value = { - "SIX,AVRO,CLUSTER,AVRO", - "CURRENT,AVRO,NONE,AVRO", - "CURRENT,AVRO,CLUSTER,AVRO", - "CURRENT,AVRO,COMPACT,AVRO", - "CURRENT,AVRO,NONE,PARQUET", - "CURRENT,AVRO,CLUSTER,PARQUET", - "CURRENT,AVRO,COMPACT,PARQUET", - "CURRENT,SPARK,NONE,PARQUET", - "CURRENT,SPARK,CLUSTER,PARQUET", - "CURRENT,SPARK,COMPACT,PARQUET"}) - void testMORLogicalRepair(String tableVersion, String recordType, String operation, String logBlockType) throws Exception { + "CLUSTER,AVRO", + "NONE,AVRO", + "COMPACT,AVRO", + "NONE,PARQUET", + "CLUSTER,PARQUET", + "COMPACT,PARQUET"}) + void testMORLogicalRepair(String operation, String logBlockType) throws Exception { timestampNTZCompatibility(() -> { try { String tableSuffix; @@ -909,8 +905,6 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati properties.setProperty("hoodie.streamer.schemaprovider.target.schema.file", schemaPath); String inputDataPath = getClass().getClassLoader().getResource("logical-repair/mor_write_updates/5").toURI().toString(); properties.setProperty("hoodie.streamer.source.dfs.root", inputDataPath); - String mergerClass = getMergerClassForRecordType(recordType); - String tableVersionString = getTableVersionCode(tableVersion); properties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); properties.setProperty("hoodie.datasource.write.precombine.field", "timestamp"); From 5401731c129ba1d4529f0a57af1712765591ec1e Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Wed, 8 Apr 2026 17:40:49 +0530 Subject: [PATCH 03/11] minor fix --- .../org/apache/hudi/table/action/commit/HoodieMergeHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index d83d9296fa454..43300fbd74d08 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -80,7 +80,6 @@ public void runMerge(HoodieTable table, HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType(); - AvroSchemaUtils.setLogicalTimestampRepairNeeded(table.getStorageConf()); HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory( table.getStorage().newInstance(mergeHandle.getOldFilePath(), table.getStorageConf().newInstance())) .getReaderFactory(recordType) From 38e7a0dda4d975d01d097aefe61e3cd55731b68f Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Wed, 8 Apr 2026 17:59:49 +0530 Subject: [PATCH 04/11] Address review comment --- .../parquet/Spark34LegacyHoodieParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala index 11dcec7af06b9..75c874580b405 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala @@ -377,7 +377,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, - enableTimestampFieldRepair = true, + enableTimestampFieldRepair = hasTimestampMillisFieldInTableSchema, datetimeRebaseSpec, int96RebaseSpec, tableSchemaAsMessageType) From 21671e222fdb80d5bf851670cff33d5a42350850 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 8 Apr 2026 20:54:38 -0700 Subject: [PATCH 05/11] Addressing feedback and optimizing schema parsing --- .../table/action/commit/HoodieMergeHelper.java | 2 +- .../commit/BaseSparkCommitActionExecutor.java | 15 +++++++++++++++ .../apache/parquet/schema/AvroSchemaRepair.java | 1 + .../org/apache/hudi/avro/AvroSchemaUtils.java | 7 +++---- .../org/apache/parquet/schema/SchemaRepair.java | 1 + .../org/apache/hudi/HoodieBaseRelation.scala | 6 ++++-- .../org/apache/hudi/HoodieMergeOnReadRDD.scala | 3 ++- .../hudi/MergeOnReadIncrementalRelation.scala | 7 ++++--- .../apache/hudi/MergeOnReadSnapshotRelation.scala | 3 +++ .../deltastreamer/TestHoodieDeltaStreamer.java | 3 ++- 10 files changed, 36 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 43300fbd74d08..691d0c29e7620 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -88,7 +88,7 @@ public void runMerge(HoodieTable table, Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); Schema readerSchema; - if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getStorageConf(), true)) { + if (!table.isMetadataTable() && AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getStorageConf(), true)) { readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); } else { readerSchema = baseFileReader.getSchema(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index f9b474eedd47a..347de07a25205 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy; import org.apache.hudi.client.utils.SparkValidatorUtils; @@ -31,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -43,12 +45,14 @@ import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieMergeHandleFactory; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.table.HoodieTable; @@ -60,6 +64,7 @@ import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.HoodieSchemaUtils; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -256,6 +261,16 @@ private HoodieData mapPartitionsAsRDD(HoodieData> d // Partition only partitionedRDD = mappedRDD.partitionBy(partitioner); } + + if (!table.isMetadataTable() && table.getMetaClient().getActiveTimeline().filterCompletedInstants().countInstants() > 0) { + TableSchemaResolver schemaResolver = new TableSchemaResolver(table.getMetaClient()); + try { + table.getHadoopConf().set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(schemaResolver.getTableAvroSchema()))); + } catch (Exception e) { + throw new HoodieException("Failed to set logical ts related configs", e); + } + } + return HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition, recordItr) -> { if (WriteOperationType.isChangingRecords(operationType)) { return handleUpsertPartition(instantTime, partition, recordItr, partitioner); diff --git a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java index 78908a40ae52e..8546b1b8a5e3a 100644 --- a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java +++ b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.AvroSchemaCache; import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.exception.HoodieException; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 1b2870f010b51..87beb66f75c6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -28,6 +28,7 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.TableChanges; import org.apache.hudi.internal.schema.utils.SchemaChangeUtils; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; @@ -65,8 +66,6 @@ public class AvroSchemaUtils { AVRO_SCHEMA_REPAIR_CLASS = clazz; } - private static final String NEEDS_LOGICAL_TIMSTAMP_REPAIR_KEY = "logical.timestamp.repair.required"; - private AvroSchemaUtils() {} /** @@ -539,13 +538,13 @@ public static boolean hasTimestampMillisField(Schema schema) { * Sets logical timestamp repair needed key in conf to true */ public static void setLogicalTimestampRepairNeeded(StorageConfiguration conf) { - conf.set(NEEDS_LOGICAL_TIMSTAMP_REPAIR_KEY, Boolean.TRUE.toString()); + conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.TRUE.toString()); } /** * Returns true if logical timestamp repair needed key is set to true or if it is not present in config */ public static boolean isLogicalTimestampRepairNeeded(StorageConfiguration storageConf, boolean defaultValue) { - return storageConf.getBoolean(NEEDS_LOGICAL_TIMSTAMP_REPAIR_KEY, defaultValue); + return storageConf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, defaultValue); } } diff --git a/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java b/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java index b85b3b98fac38..8a4b53e6ab6ea 100644 --- a/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java +++ b/hudi-hadoop-common/src/parquet/java/org/apache/parquet/schema/SchemaRepair.java @@ -20,6 +20,7 @@ package org.apache.parquet.schema; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import java.util.ArrayList; import java.util.List; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 0534f3dac8ae7..3b27bc165b876 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -116,6 +116,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig + protected lazy val hasTimestampMillisFieldInTableSchema: Boolean = HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema) + protected lazy val basePath: Path = new Path(metaClient.getBasePath.toUri) // NOTE: Record key-field is assumed singular here due to the either of @@ -245,7 +247,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // We're delegating to Spark to append partition values to every row only in cases // when these corresponding partition-values are not persisted w/in the data file itself val parquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat( - shouldExtractPartitionValuesFromPartitionPath, tableAvroSchema, hasTimestampMillisFieldInTableSchema = HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema)).get + shouldExtractPartitionValuesFromPartitionPath, tableAvroSchema, hasTimestampMillisFieldInTableSchema = hasTimestampMillisFieldInTableSchema).get (parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID) } @@ -555,7 +557,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // when these corresponding partition-values are not persisted w/in the data file itself appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath), tableAvroSchema, - hasTimestampMillisFieldInTableSchema = HoodieSchemaUtils.hasTimestampMillisField(tableAvroSchema) + hasTimestampMillisFieldInTableSchema = hasTimestampMillisFieldInTableSchema ) // Since partition values by default are omitted, and not persisted w/in data-files by Spark, // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index db538f110c90b..5f78002ddddae 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -23,9 +23,11 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} import org.apache.hudi.HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible +import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -85,7 +87,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD { protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) - private val hadoopConfBroadcast = sc.broadcast(new SerializableWritable(config)) override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 4d83a4d5288d4..b09e416c01824 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -17,7 +17,7 @@ package org.apache.hudi -import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path} +import org.apache.hadoop.fs.GlobPattern import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling import org.apache.hudi.common.model.{FileSlice, HoodieRecord} @@ -29,10 +29,9 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths import org.apache.hudi.storage.StoragePathInfo - -import org.apache.hadoop.fs.GlobPattern import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -81,6 +80,8 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + jobConf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + java.lang.Boolean.toString(hasTimestampMillisFieldInTableSchema)) new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 5b6be9c55857b..bcb216f178122 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -24,6 +24,7 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile, isProjectionCompatible} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, OverwriteWithLatestAvroPayload} +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.storage.StoragePath @@ -115,6 +116,8 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + jobConf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + java.lang.Boolean.toString(hasTimestampMillisFieldInTableSchema)) new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index b4659fa44e94f..90f2a38ea76b9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -869,7 +869,8 @@ void testCOWLogicalRepair(String operation) throws Exception { "COMPACT,AVRO", "NONE,PARQUET", "CLUSTER,PARQUET", - "COMPACT,PARQUET"}) + "COMPACT,PARQUET" + }) void testMORLogicalRepair(String operation, String logBlockType) throws Exception { timestampNTZCompatibility(() -> { try { From c8b9d02e1bbd7fafafebc7d6197b0ad5936d9229 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 9 Apr 2026 10:49:20 +0530 Subject: [PATCH 06/11] Fix compilation --- .../hudi/table/action/commit/BaseSparkCommitActionExecutor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 347de07a25205..7c795b064ffe5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -64,7 +64,6 @@ import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.HoodieSchemaUtils; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From ce6e12be5bf3e69366241634687f7f0cac077372 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 9 Apr 2026 13:38:57 +0530 Subject: [PATCH 07/11] More fixes and test fix --- .../org/apache/hudi/index/HoodieIndexUtils.java | 2 +- .../table/action/commit/HoodieMergeHelper.java | 2 +- .../table/action/compact/HoodieCompactor.java | 4 +--- .../io/storage/HoodieSparkParquetReader.java | 6 ++---- .../commit/BaseSparkCommitActionExecutor.java | 12 +++++++++--- .../org/apache/hudi/avro/AvroSchemaUtils.java | 16 ++++++++++++---- .../table/log/AbstractHoodieLogRecordReader.java | 6 ++---- .../hudi/io/hadoop/HoodieAvroParquetReader.java | 4 ++-- .../org/apache/hudi/IncrementalRelation.scala | 5 ++--- 9 files changed, 32 insertions(+), 25 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index ce0ec0bdbff80..256421cf8b066 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -246,7 +246,7 @@ private static HoodieData> getExistingRecords( .lastInstant() .map(HoodieInstant::getTimestamp); Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); - boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema); + boolean hasTimestampFields = AvroSchemaUtils.isLogicalTimestampRepairNeeded(hoodieTable.getHadoopConf(), () -> baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema)); return partitionLocations.flatMap(p -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue()), baseFileReaderSchema, hasTimestampFields) .getMergedRecords().iterator()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 691d0c29e7620..59b0d3a1eb019 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -88,7 +88,7 @@ public void runMerge(HoodieTable table, Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); Schema readerSchema; - if (!table.isMetadataTable() && AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getStorageConf(), true)) { + if (!table.isMetadataTable() && AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getStorageConf(), () -> true)) { readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); } else { readerSchema = baseFileReader.getSchema(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index de0059ba95be6..2a817d161a8ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -134,9 +134,7 @@ public HoodieData compact( // Since we are using merge handle here, we can directly query the write schema from conf // Write handle provides an option to use overridden write schema as well which is not used by merge handle Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); - if (AvroSchemaUtils.hasTimestampMillisField(writerSchema)) { - AvroSchemaUtils.setLogicalTimestampRepairNeeded(table.getStorageConf()); - } + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> AvroSchemaUtils.hasTimestampMillisField(writerSchema)); return context.parallelize(operations).map(operation -> compact( compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper)) .flatMap(List::iterator); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index a52dd06ff4864..0321626e8a0fa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -128,10 +128,8 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema if (requestedSchema == null) { requestedSchema = readerSchema; } - // Set configuration for timestamp_millis type repair. - if (!storage.getConf().contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) { - storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); - } + // Set configuration for timestamp_millis type repair (only when not already set). + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(storage.getConf(), () -> AvroSchemaUtils.hasTimestampMillisField(readerSchema)); MessageType fileSchema = getFileSchema(); Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema); Option messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema)); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 7c795b064ffe5..9b808ad7b2423 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -49,10 +49,10 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieMergeHandleFactory; -import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.table.HoodieTable; @@ -261,10 +261,16 @@ private HoodieData mapPartitionsAsRDD(HoodieData> d partitionedRDD = mappedRDD.partitionBy(partitioner); } - if (!table.isMetadataTable() && table.getMetaClient().getActiveTimeline().filterCompletedInstants().countInstants() > 0) { + if (!table.isMetadataTable() && table.getMetaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 0) { TableSchemaResolver schemaResolver = new TableSchemaResolver(table.getMetaClient()); try { - table.getHadoopConf().set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(schemaResolver.getTableAvroSchema()))); + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getHadoopConf(), () -> { + try { + return AvroSchemaUtils.hasTimestampMillisField(schemaResolver.getTableAvroSchema()); + } catch (Exception e) { + throw new HoodieSchemaException("Failed to resolve schema", e); + } + }); } catch (Exception e) { throw new HoodieException("Failed to set logical ts related configs", e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 87beb66f75c6c..95e85e99f1774 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -44,6 +44,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.hudi.common.util.CollectionUtils.reduce; @@ -537,14 +538,21 @@ public static boolean hasTimestampMillisField(Schema schema) { /** * Sets logical timestamp repair needed key in conf to true */ - public static void setLogicalTimestampRepairNeeded(StorageConfiguration conf) { - conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.TRUE.toString()); + public static void setLogicalTimestampRepairIfNotSet(StorageConfiguration conf, Supplier valueSupplier) { + if (!conf.contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) { + conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, valueSupplier.get().toString()); + } } /** * Returns true if logical timestamp repair needed key is set to true or if it is not present in config */ - public static boolean isLogicalTimestampRepairNeeded(StorageConfiguration storageConf, boolean defaultValue) { - return storageConf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, defaultValue); + public static boolean isLogicalTimestampRepairNeeded(StorageConfiguration conf, Supplier defaultValueSupplier) { + Option valueOpt = conf.getString(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR); + if (valueOpt.isEmpty() || StringUtils.isNullOrEmpty(valueOpt.get())) { + return defaultValueSupplier.get(); + } else { + return Boolean.parseBoolean(valueOpt.get()); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index d62a25b1e1add..c42ff4a159bb9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -43,7 +43,6 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; -import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; @@ -70,7 +69,6 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME; -import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -186,8 +184,8 @@ protected AbstractHoodieLogRecordReader(HoodieStorage storage, String basePath, this.forceFullScan = forceFullScan; this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; - this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() && storage.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, - () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); + this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() && + AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); if (keyFieldOverride.isPresent()) { // NOTE: This branch specifically is leveraged handling Metadata Table diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java index e14b99c0f68f9..f179167c41a54 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java @@ -174,7 +174,7 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema // be able to read that file (in case projection is a proper one) Configuration hadoopConf = storage.getConf().unwrapCopyAs(Configuration.class); Schema repairedFileSchema; - if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), true)) { + if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> true)) { repairedFileSchema = getRepairedSchema(getSchema(), schema); } else { repairedFileSchema = schema; @@ -190,7 +190,7 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema } ParquetReader reader = new HoodieAvroParquetReaderBuilder(path, - AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), false) || schema == null || AvroSchemaUtils.hasTimestampMillisField(schema)) + AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> schema == null || AvroSchemaUtils.hasTimestampMillisField(schema))) .withTableSchema(schema) .withConf(hadoopConf) .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 527a7c6c5f514..683c3876c81f0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -212,9 +212,8 @@ class IncrementalRelation(val sqlContext: SQLContext, if (tableAvroSchema.getType != Schema.Type.NULL) { LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf( sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema) - sqlContext.sparkContext.hadoopConfiguration.set( - HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, - AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString) + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(sqlContext.sparkContext.hadoopConfiguration, + () => AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema)) } val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID From c21bd5c19c0e8710839bed8d39e2b5396fd86ab0 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 9 Apr 2026 15:18:22 +0530 Subject: [PATCH 08/11] Fix checkstyle --- .../java/org/apache/hudi/index/HoodieIndexUtils.java | 3 ++- .../action/commit/BaseSparkCommitActionExecutor.java | 2 +- .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 10 ++++++++++ .../table/log/AbstractHoodieLogRecordReader.java | 4 ++-- .../scala/org/apache/hudi/IncrementalRelation.scala | 5 +++-- 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 256421cf8b066..d1f45851bafa2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -246,7 +246,8 @@ private static HoodieData> getExistingRecords( .lastInstant() .map(HoodieInstant::getTimestamp); Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); - boolean hasTimestampFields = AvroSchemaUtils.isLogicalTimestampRepairNeeded(hoodieTable.getHadoopConf(), () -> baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema)); + boolean hasTimestampFields = AvroSchemaUtils.isLogicalTimestampRepairNeeded(hoodieTable.getStorageConf(), + () -> baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema)); return partitionLocations.flatMap(p -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue()), baseFileReaderSchema, hasTimestampFields) .getMergedRecords().iterator()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 9b808ad7b2423..57e8975c2bc0b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -264,7 +264,7 @@ private HoodieData mapPartitionsAsRDD(HoodieData> d if (!table.isMetadataTable() && table.getMetaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 0) { TableSchemaResolver schemaResolver = new TableSchemaResolver(table.getMetaClient()); try { - AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getHadoopConf(), () -> { + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> { try { return AvroSchemaUtils.hasTimestampMillisField(schemaResolver.getTableAvroSchema()); } catch (Exception e) { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 95e85e99f1774..3ef4f151645f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -33,6 +33,7 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; +import org.apache.hadoop.conf.Configuration; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -535,6 +536,15 @@ public static boolean hasTimestampMillisField(Schema schema) { } } + /** + * Sets logical timestamp repair needed key in conf to true + */ + public static void setLogicalTimestampRepairIfNotSet(Configuration conf, Supplier valueSupplier) { + if (conf.get(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR) == null) { + conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, valueSupplier.get().toString()); + } + } + /** * Sets logical timestamp repair needed key in conf to true */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index c42ff4a159bb9..dc610a16758d0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -184,8 +184,8 @@ protected AbstractHoodieLogRecordReader(HoodieStorage storage, String basePath, this.forceFullScan = forceFullScan; this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; - this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() && - AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); + this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() + && AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); if (keyFieldOverride.isPresent()) { // NOTE: This branch specifically is leveraged handling Metadata Table diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 683c3876c81f0..00505e8fb1e06 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -38,9 +38,10 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath} import org.apache.hudi.table.HoodieSparkTable - import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.GlobPattern +import org.apache.hudi.util.JFunction import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat @@ -213,7 +214,7 @@ class IncrementalRelation(val sqlContext: SQLContext, LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf( sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema) AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(sqlContext.sparkContext.hadoopConfiguration, - () => AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema)) + JFunction.toJavaSupplier(() => AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).asInstanceOf[java.lang.Boolean])) } val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID From fcb55bc1875e0616b07a82b704d784eef868bb5a Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 9 Apr 2026 15:33:37 +0530 Subject: [PATCH 09/11] Other fixes --- .../apache/hudi/common/table/log/HoodieLogFileReader.java | 3 +-- .../org/apache/hudi/MergeOnReadIncrementalRelation.scala | 5 +++-- .../scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala | 6 +++--- .../org/apache/spark/sql/adapter/Spark3_4Adapter.scala | 6 +++--- .../org/apache/spark/sql/adapter/Spark3_5Adapter.scala | 6 +++--- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 44fc5ec6fd1fe..f8e873d142395 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -39,7 +39,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.io.SeekableDataInputStream; -import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.util.IOUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; @@ -100,7 +99,7 @@ public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, Schema boolean enableRecordLookups, String keyField) throws IOException { this(storage, logFile, readerSchema, bufferSize, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema(), - storage.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema))); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index b09e416c01824..85fe07f2331aa 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -20,6 +20,7 @@ package org.apache.hudi import org.apache.hadoop.fs.GlobPattern import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling +import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME @@ -32,6 +33,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForC import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths import org.apache.hudi.storage.StoragePathInfo +import org.apache.hudi.util.JFunction import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -80,8 +82,7 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) - jobConf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, - java.lang.Boolean.toString(hasTimestampMillisFieldInTableSchema)) + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index bcb216f178122..fe7773502cba7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -22,11 +22,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile, isProjectionCompatible} -import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, OverwriteWithLatestAvroPayload} import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.storage.StoragePath +import org.apache.hudi.util.JFunction import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext @@ -116,8 +117,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) - jobConf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, - java.lang.Boolean.toString(hasTimestampMillisFieldInTableSchema)) + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index ab73ded838e77..df7c5ead29ee7 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -24,6 +24,7 @@ import org.apache.hudi.storage.HoodieStorage import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration +import org.apache.hudi.util.JFunction import org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter import org.apache.parquet.schema.{MessageType, SchemaRepair, Type} import org.apache.spark.sql.avro._ @@ -82,8 +83,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter { override def getParquetReadSupport(storage: HoodieStorage, messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { - val enableTimestampFieldRepair = storage.getConf.getBoolean( - HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, JFunction.toJavaSupplier(() => true)) new HoodieParquetReadSupport( Option.empty[ZoneId], enableVectorizedReader = true, @@ -169,7 +169,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter { val cachedRequestedSchema = HoodieInternalRowUtils.getCachedSchema(nonNullRequestedSchema) val requestedSchemaInMessageType = org.apache.hudi.common.util.Option.of( getAvroSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(nonNullRequestedSchema)) - val enableTimestampFieldRepair = storage.getConf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, JFunction.toJavaSupplier(() => true)) val repairedRequestedSchema = repairSchemaIfSpecified(enableTimestampFieldRepair, fileSchema, requestedSchemaInMessageType) val repairedRequestedStructType = new ParquetToSparkSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(repairedRequestedSchema) val evolution = new SparkBasicSchemaEvolution(repairedRequestedStructType, cachedRequestedSchema, SQLConf.get.sessionLocalTimeZone) diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala index 61aa3f1a915b3..a8aac2d4f2911 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala @@ -21,6 +21,7 @@ import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.{Spark35HoodieFileScanRDD, SparkAdapterSupport$} import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.storage.HoodieStorage +import org.apache.hudi.util.JFunction import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration @@ -139,8 +140,7 @@ class Spark3_5Adapter extends BaseSpark3Adapter { override def getParquetReadSupport(storage: HoodieStorage, messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { - val enableTimestampFieldRepair = storage.getConf.getBoolean( - HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, JFunction.toJavaSupplier(() => true)) new HoodieParquetReadSupport( Option.empty[ZoneId], enableVectorizedReader = true, @@ -166,7 +166,7 @@ class Spark3_5Adapter extends BaseSpark3Adapter { val nonNullRequestedSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema) val cachedRequestedSchema = HoodieInternalRowUtils.getCachedSchema(nonNullRequestedSchema) val requestedSchemaInMessageType = org.apache.hudi.common.util.Option.of(getAvroSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(nonNullRequestedSchema)) - val enableTimestampFieldRepair = storage.getConf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf, JFunction.toJavaSupplier(() => true)) val repairedRequestedSchema = repairSchemaIfSpecified(enableTimestampFieldRepair, fileSchema, requestedSchemaInMessageType) val repairedRequestedStructType = new ParquetToSparkSchemaConverter(storage.getConf.unwrapAs(classOf[Configuration])).convert(repairedRequestedSchema) val evolution = new SparkBasicSchemaEvolution(repairedRequestedStructType, cachedRequestedSchema, SQLConf.get.sessionLocalTimeZone) From f0fa682484fdd1fabec19c8e49edbafbcd866da5 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 9 Apr 2026 16:52:08 +0530 Subject: [PATCH 10/11] Fix compilation --- .../table/action/commit/BaseSparkCommitActionExecutor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 57e8975c2bc0b..68a58b706ac42 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -49,7 +49,6 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieMergeHandleFactory; @@ -268,7 +267,7 @@ private HoodieData mapPartitionsAsRDD(HoodieData> d try { return AvroSchemaUtils.hasTimestampMillisField(schemaResolver.getTableAvroSchema()); } catch (Exception e) { - throw new HoodieSchemaException("Failed to resolve schema", e); + return true; } }); } catch (Exception e) { From b36d0133c603f81f8ee35eb5da2ef63119257ccd Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Fri, 10 Apr 2026 00:27:58 +0530 Subject: [PATCH 11/11] Fix test failures --- .../org/apache/hudi/index/HoodieIndexUtils.java | 2 +- .../apache/hudi/io/HoodieMergedReadHandle.java | 9 ++++----- .../HoodieBackedTableMetadataWriter.java | 2 +- .../table/action/compact/HoodieCompactor.java | 4 +++- .../hudi/io/TestHoodieMergedReadHandle.java | 2 +- .../org/apache/hudi/IncrementalRelation.scala | 17 ++++++++++------- .../hudi/MergeOnReadIncrementalRelation.scala | 4 +++- .../hudi/MergeOnReadSnapshotRelation.scala | 4 +++- .../deltastreamer/TestHoodieDeltaStreamer.java | 9 +++++++-- 9 files changed, 33 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index d1f45851bafa2..66ff6b6e43e6d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -249,7 +249,7 @@ private static HoodieData> getExistingRecords( boolean hasTimestampFields = AvroSchemaUtils.isLogicalTimestampRepairNeeded(hoodieTable.getStorageConf(), () -> baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema)); return partitionLocations.flatMap(p - -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue()), baseFileReaderSchema, hasTimestampFields) + -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue()), hasTimestampFields) .getMergedRecords().iterator()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java index 9041fe49a365f..28675c6779a8a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java @@ -58,18 +58,17 @@ public class HoodieMergedReadHandle extends HoodieReadHandle instantTime, HoodieTable hoodieTable, Pair partitionPathFileIDPair, - Schema baseFileReaderSchema, boolean hasTimestampFields) { - this(config, instantTime, hoodieTable, partitionPathFileIDPair, baseFileReaderSchema, hasTimestampFields, Option.empty()); + boolean hasTimestampFields) { + this(config, instantTime, hoodieTable, partitionPathFileIDPair, hasTimestampFields, Option.empty()); } public HoodieMergedReadHandle(HoodieWriteConfig config, Option instantTime, HoodieTable hoodieTable, Pair partitionPathFileIDPair, - Schema baseFileReaderSchema, boolean hasTimestampFields, - Option fileSliceOption) { + boolean hasTimestampFields, Option fileSliceOption) { super(config, instantTime, hoodieTable, partitionPathFileIDPair); Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); // config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data. - this.baseFileReaderSchema = baseFileReaderSchema; + this.baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice(); // Repair reader schema. // Assume writer schema should be correct. If not, no repair happens. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index d1b0b39d0f438..cf9e988e0a8d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -589,7 +589,7 @@ private static HoodieData readRecordKeysFromFileSliceSnapshot(Hood final String fileId = fileSlice.getFileId(); Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField()); boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema); - return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), baseFileReaderSchema, hasTimestampFields, Option.of(fileSlice)) + return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), hasTimestampFields, Option.of(fileSlice)) .getMergedRecords().stream().map(record -> { HoodieRecord record1 = (HoodieRecord) record; return HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), partition, fileId, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 2a817d161a8ce..bdcb40c0ffb58 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -134,7 +134,9 @@ public HoodieData compact( // Since we are using merge handle here, we can directly query the write schema from conf // Write handle provides an option to use overridden write schema as well which is not used by merge handle Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); - AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> AvroSchemaUtils.hasTimestampMillisField(writerSchema)); + if (!table.isMetadataTable()) { + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> AvroSchemaUtils.hasTimestampMillisField(writerSchema)); + } return context.parallelize(operations).map(operation -> compact( compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper)) .flatMap(List::iterator); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java index 90564d580bddf..cee0ceac97494 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java @@ -204,7 +204,7 @@ private void doMergedReadAndValidate(HoodieTableMetaClient metaClient, HoodieWri Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getWriteSchema()), writeConfig.allowOperationMetadataField()); boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema); HoodieMergedReadHandle mergedReadHandle = new HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, partitionPathAndFileIDPairs.get(0), - baseFileReaderSchema, hasTimestampFields); + hasTimestampFields); List mergedRecords = mergedReadHandle.getMergedRecords(); assertEquals(totalRecords, mergedRecords.size()); List sortedMergedRecords = mergedRecords.stream() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 00505e8fb1e06..3731a7b5e0c70 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -17,6 +17,9 @@ package org.apache.hudi +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead @@ -206,14 +209,14 @@ class IncrementalRelation(val sqlContext: SQLContext, // pass internalSchema to hadoopConf, so it can be used in executors. val validCommits = metaClient .getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",") - sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) - sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath.toString) - sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) + val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) + conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath.toString) + conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) // Pass table Avro schema to hadoopConf so supportBatch() can find it (supportBatch does not receive options). if (tableAvroSchema.getType != Schema.Type.NULL) { - LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf( - sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema) - AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(sqlContext.sparkContext.hadoopConfiguration, + LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf(conf, tableAvroSchema) + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(conf, JFunction.toJavaSupplier(() => AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).asInstanceOf[java.lang.Boolean])) } val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { @@ -255,7 +258,7 @@ class IncrementalRelation(val sqlContext: SQLContext, val timer = HoodieTimer.start val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths - val storageConf = HadoopFSUtils.getStorageConfWithCopy(sqlContext.sparkContext.hadoopConfiguration) + val storageConf = HadoopFSUtils.getStorageConfWithCopy(conf) val localBasePathStr = basePath.toString val firstNotFoundPath = sqlContext.sparkContext.parallelize(allFilesToCheck.toSeq, allFilesToCheck.size) .map(path => { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 85fe07f2331aa..c340a5507ea81 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -82,7 +82,9 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) - AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) + if (!metaClient.isMetadataTable) { + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) + } new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index fe7773502cba7..deb3cdf12a4bf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -117,7 +117,9 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) - AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) + if (!metaClient.isMetadataTable) { + AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) + } new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 90f2a38ea76b9..ba54be6de3570 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -788,8 +788,8 @@ private void assertBoundaryCounts(Dataset df, String exprZero, String exprT } @ParameterizedTest - @CsvSource(value = {"CLUSTER", "NONE"}) - void testCOWLogicalRepair(String operation) throws Exception { + @CsvSource(value = {"CLUSTER,AVRO", "NONE,AVRO", "CLUSTER,SPARK", "NONE,SPARK"}) + void testCOWLogicalRepair(String operation, String recordType) throws Exception { timestampNTZCompatibility(() -> { try { String dirName = "trips_logical_types_json_cow_write"; @@ -813,6 +813,11 @@ void testCOWLogicalRepair(String operation) throws Exception { properties.setProperty("hoodie.metatata.enable", "true"); properties.setProperty("hoodie.parquet.small.file.limit", "-1"); properties.setProperty("hoodie.cleaner.commits.retained", "10"); + if (recordType.equals("SPARK")) { + properties.setProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger"); + } else { + properties.setProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.common.model.HoodieAvroRecordMerger"); + } Option propt = Option.of(properties); new HoodieStreamer(prepCfgForCowLogicalRepair(tableBasePath, "456"), jsc, propt).sync();