Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.index;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
Expand Down Expand Up @@ -244,8 +245,11 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp);
Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the schema setup pattern on these two lines is duplicated identically in HoodieBackedTableMetadataWriter.java (lines 590–591) and TestHoodieMergedReadHandle.java (lines 204–205) — could you extract this into a static helper method to reduce duplication?

boolean hasTimestampFields = AvroSchemaUtils.isLogicalTimestampRepairNeeded(hoodieTable.getStorageConf(),
() -> baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema));
return partitionLocations.flatMap(p
-> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue()))
-> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue()), hasTimestampFields)
.getMergedRecords().iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,23 @@ public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K
protected final Schema baseFileReaderSchema;
private final Option<FileSlice> fileSliceOpt;

public HoodieMergedReadHandle(HoodieWriteConfig config,
Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
this(config, instantTime, hoodieTable, partitionPathFileIDPair, Option.empty());
public HoodieMergedReadHandle(HoodieWriteConfig config, Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable, Pair<String, String> partitionPathFileIDPair,
boolean hasTimestampFields) {
this(config, instantTime, hoodieTable, partitionPathFileIDPair, hasTimestampFields, Option.empty());
}

public HoodieMergedReadHandle(HoodieWriteConfig config,
Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair,
Option<FileSlice> fileSliceOption) {
public HoodieMergedReadHandle(HoodieWriteConfig config, Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable, Pair<String, String> partitionPathFileIDPair,
boolean hasTimestampFields, Option<FileSlice> fileSliceOption) {
super(config, instantTime, hoodieTable, partitionPathFileIDPair);
Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
// config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data.
baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
this.baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice();
// Repair reader schema.
// Assume writer schema should be correct. If not, no repair happens.
readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, baseFileReaderSchema);
readerSchema = hasTimestampFields ? AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, this.baseFileReaderSchema) : orignalReaderSchema;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the comment above this line should be updated to clarify that the repair is now conditional — it only happens when hasTimestampFields is true, rather than always repairing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The guard hasTimestampFields is checked via hasTimestampMillisField, which returns true only for TimestampMillis and LocalTimestampMillis. However, needsLogicalTypeRepair in AvroSchemaRepair also repairs the case where the file schema is a plain LONG and the table schema has LocalTimestampMicros. In that scenario, hasTimestampMillisField returns false but the repair would still be needed. Could you verify this gap doesn't affect any realistic schema canonicalization path, or broaden the check to cover all timestamp logical types that needsLogicalTypeRepair handles?

}

public List<HoodieRecord<T>> getMergedRecords() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.metadata;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
Expand Down Expand Up @@ -75,6 +77,7 @@
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -584,8 +587,10 @@ private static HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(Hood
final String partition = partitionAndFileSlice.getKey();
final FileSlice fileSlice = partitionAndFileSlice.getValue();
final String fileId = fileSlice.getFileId();
return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()),
Option.of(fileSlice)).getMergedRecords().stream().map(record -> {
Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The schema parsing and hasTimestampMillisField computation are inside the flatMap lambda, so they're re-evaluated for every partition/file-slice pair. The PR description says this should be computed on the driver side. Could you hoist baseFileReaderSchema and hasTimestampFields above the flatMap, similar to how it's done in HoodieIndexUtils.getExistingRecords?

boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema);
return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), hasTimestampFields, Option.of(fileSlice))
.getMergedRecords().stream().map(record -> {
HoodieRecord record1 = (HoodieRecord) record;
return HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), partition, fileId,
record1.getCurrentLocation().getInstantTime(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieFileReader bootstrapFileReader = null;

Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema);

Schema readerSchema;
if (!table.isMetadataTable() && AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getStorageConf(), () -> true)) {
readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema);
} else {
readerSchema = baseFileReader.getSchema();
}

// In case Advanced Schema Evolution is enabled we might need to rewrite currently
// persisted records to adhere to an evolved schema
Expand Down
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Line 136: This LocalTimestampMicros gap is still open from prior rounds. hasTimestampMillisField only returns true for MILLIS-precision timestamps, but needsLogicalTypeRepair also handles the case where the file schema is a plain LONG and the table schema has LocalTimestampMicros. With setLogicalTimestampRepairIfNotSet now writing false into the config for such tables (instead of leaving it unset), the downstream repair is skipped — a regression from the old default-true fallback. Could you broaden the check to cover all logical timestamp types that needsLogicalTypeRepair handles, or rename it to something like hasTimestampLogicalTypeField?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.compact;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
Expand Down Expand Up @@ -130,6 +131,12 @@ public HoodieData<WriteStatus> compact(
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
// if this is a MDT, set up the instant range of log reader just like regular MDT snapshot reader.
Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient);
// Since we are using merge handle here, we can directly query the write schema from conf
// Write handle provides an option to use overridden write schema as well which is not used by merge handle
Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Same gap as before — hasTimestampMillisField only returns true for MILLIS-precision timestamps, but needsLogicalTypeRepair also handles the case where the file schema is a plain LONG and the table schema has LocalTimestampMicros (non-UTC-adjusted). If someone's write schema has LocalTimestampMicros fields, the flag won't be set, and the downstream parquet read support could skip a needed repair. Could you either broaden this check to cover all timestamp logical types that needsLogicalTypeRepair handles, or add a comment explaining why LocalTimestampMicros can't appear here?

🤖 This review comment was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This still has the LocalTimestampMicros gap I've flagged in prior rounds. Concretely: the old code left ENABLE_LOGICAL_TIMESTAMP_REPAIR unset when hasTimestampMillisField was false, so HoodieMergeHelper fell through to its default of true and repair happened. Now setLogicalTimestampRepairIfNotSet writes "false" into the config, HoodieMergeHelper reads it, and repair is skipped. For a table whose schema has LocalTimestampMicros (non-UTC local timestamp with microsecond precision), this is a regression — base files written as plain LONG will no longer be repaired during compaction. Could you broaden the check to cover all logical timestamp types that needsLogicalTypeRepair handles (i.e., any non-UTC-adjusted local timestamp, regardless of precision)?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The metadata table guard is a nice optimization, but hasTimestampMillisField still doesn't cover LocalTimestampMicros — so for non-metadata tables with LocalTimestampMicros fields, setLogicalTimestampRepairIfNotSet writes false, skipping the repair that needsLogicalTypeRepair would otherwise perform. Could you add LocalTimestampMicros to the check (or rename it to reflect all handled types)?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

if (!table.isMetadataTable()) {
AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> AvroSchemaUtils.hasTimestampMillisField(writerSchema));
}
return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper))
.flatMap(List::iterator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,8 @@ private ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
// Set configuration for timestamp_millis type repair.
if (!storage.getConf().contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) {
storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema)));
}
// Set configuration for timestamp_millis type repair (only when not already set).
AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(storage.getConf(), () -> AvroSchemaUtils.hasTimestampMillisField(readerSchema));
MessageType fileSchema = getFileSchema();
Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy;
import org.apache.hudi.client.utils.SparkValidatorUtils;
Expand All @@ -31,6 +32,7 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand All @@ -43,6 +45,7 @@
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.index.HoodieIndex;
Expand Down Expand Up @@ -256,6 +259,22 @@ private HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> d
// Partition only
partitionedRDD = mappedRDD.partitionBy(partitioner);
}

if (!table.isMetadataTable() && table.getMetaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 0) {
TableSchemaResolver schemaResolver = new TableSchemaResolver(table.getMetaClient());
try {
AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If getTableAvroSchema() throws (e.g., corrupted timeline, missing commit metadata), the entire commit operation will fail. Previously, repair was unconditionally enabled, so a schema resolution failure wouldn't block writes. Could you catch the inner exception and default to true (enable repair) instead of aborting? Something like wrapping the supplier to return true on failure would preserve the old fail-safe behavior.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

try {
return AvroSchemaUtils.hasTimestampMillisField(schemaResolver.getTableAvroSchema());
} catch (Exception e) {
return true;
}
});
} catch (Exception e) {
throw new HoodieException("Failed to set logical ts related configs", e);
}
}

return HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition, recordItr) -> {
if (WriteOperationType.isChangingRecords(operationType)) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.AWSDmsAvroPayload;
Expand All @@ -41,6 +43,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -198,7 +201,10 @@ private void doMergedReadAndValidate(HoodieTableMetaClient metaClient, HoodieWri
.collect(Collectors.toList());
assertEquals(1, partitionPathAndFileIDPairs.size());
String latestCommitTime = table.getActiveTimeline().lastInstant().get().getTimestamp();
HoodieMergedReadHandle mergedReadHandle = new HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, partitionPathAndFileIDPairs.get(0));
Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getWriteSchema()), writeConfig.allowOperationMetadataField());
boolean hasTimestampFields = baseFileReaderSchema != null && AvroSchemaUtils.hasTimestampMillisField(baseFileReaderSchema);
HoodieMergedReadHandle mergedReadHandle = new HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, partitionPathAndFileIDPairs.get(0),
hasTimestampFields);
List<HoodieRecord> mergedRecords = mergedReadHandle.getMergedRecords();
assertEquals(totalRecords, mergedRecords.size());
List<HoodieRecord> sortedMergedRecords = mergedRecords.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.exception.HoodieException;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.TableChanges;
import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.storage.StorageConfiguration;

import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.conf.Configuration;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand All @@ -42,6 +45,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.CollectionUtils.reduce;
Expand Down Expand Up @@ -531,4 +535,34 @@ public static boolean hasTimestampMillisField(Schema schema) {
return false;
}
}

/**
* Sets logical timestamp repair needed key in conf to true
*/
public static void setLogicalTimestampRepairIfNotSet(Configuration conf, Supplier<Boolean> valueSupplier) {
if (conf.get(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR) == null) {
conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, valueSupplier.get().toString());
}
}

/**
* Sets logical timestamp repair needed key in conf to true
*/
public static void setLogicalTimestampRepairIfNotSet(StorageConfiguration conf, Supplier<Boolean> valueSupplier) {
if (!conf.contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) {
conf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, valueSupplier.get().toString());
}
}

/**
* Returns true if logical timestamp repair needed key is set to true or if it is not present in config
*/
public static boolean isLogicalTimestampRepairNeeded(StorageConfiguration conf, Supplier<Boolean> defaultValueSupplier) {
Option<String> valueOpt = conf.getString(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR);
if (valueOpt.isEmpty() || StringUtils.isNullOrEmpty(valueOpt.get())) {
return defaultValueSupplier.get();
} else {
return Boolean.parseBoolean(valueOpt.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;

Expand All @@ -70,7 +69,6 @@
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;
import static org.apache.hudi.common.util.ValidationUtils.checkState;

Expand Down Expand Up @@ -186,8 +184,8 @@ protected AbstractHoodieLogRecordReader(HoodieStorage storage, String basePath,
this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() && storage.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR,
() -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema));
this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable()
&& AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema));

if (keyFieldOverride.isPresent()) {
// NOTE: This branch specifically is leveraged handling Metadata Table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.util.IOUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
Expand Down Expand Up @@ -100,7 +99,7 @@ public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, Schema
boolean enableRecordLookups, String keyField) throws IOException {
this(storage, logFile, readerSchema, bufferSize, reverseReader, enableRecordLookups, keyField,
InternalSchema.getEmptyInternalSchema(),
storage.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR,
AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(),
() -> readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.io.hadoop;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
Expand Down Expand Up @@ -172,7 +173,12 @@ private ClosableIterator<IndexedRecord> getIndexedRecordIteratorInternal(Schema
// sure that in case the file-schema is not equal to read-schema we'd still
// be able to read that file (in case projection is a proper one)
Configuration hadoopConf = storage.getConf().unwrapCopyAs(Configuration.class);
Schema repairedFileSchema = getRepairedSchema(getSchema(), schema);
Schema repairedFileSchema;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This uses defaultValue=true while the builder on line 192 uses defaultValue=false. When the conf flag isn't explicitly set (e.g., reads not going through merge/compaction), the file schema gets repaired here but the parquet read support may not repair. Is the asymmetry intentional — i.e., is Avro-level schema repair always safe/cheap enough to keep as default, while the parquet-level repair is the expensive one being optimized?

🤖 This review comment was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> true)) {
repairedFileSchema = getRepairedSchema(getSchema(), schema);
} else {
repairedFileSchema = schema;
}
Option<Schema> promotedSchema = Option.empty();
if (!renamedColumns.isEmpty() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema, schema)) {
AvroReadSupport.setAvroReadSchema(hadoopConf, repairedFileSchema);
Expand All @@ -183,7 +189,8 @@ private ClosableIterator<IndexedRecord> getIndexedRecordIteratorInternal(Schema
AvroReadSupport.setRequestedProjection(hadoopConf, schema);
}
ParquetReader<IndexedRecord> reader =
new HoodieAvroParquetReaderBuilder<IndexedRecord>(path)
new HoodieAvroParquetReaderBuilder<IndexedRecord>(path,
AvroSchemaUtils.isLogicalTimestampRepairNeeded(storage.getConf(), () -> schema == null || AvroSchemaUtils.hasTimestampMillisField(schema)))
.withTableSchema(schema)
.withConf(hadoopConf)
.set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS))
Expand Down
Loading
Loading