fix: timestamp logical types#18132
Conversation
| val enableVectorizedReader: Boolean = | ||
| sqlConf.parquetVectorizedReaderEnabled && | ||
| resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) | ||
| ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) |
There was a problem hiding this comment.
This is the fix.
lokeshj1703
left a comment
There was a problem hiding this comment.
@linliu-code Found few optimisation which can be added. These would be needed in branch-0.x as well.
|
|
||
| Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); | ||
| Schema readerSchema = baseFileReader.getSchema(); | ||
| Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); |
There was a problem hiding this comment.
I can see this code getting executed in executor. Ran test: org.apache.hudi.functional.TestRecordLevelIndex#testRLIUpsert
This might be a problem with branch-0.x as well.
There was a problem hiding this comment.
can you help me understand, why are we not trying to infer isLogicalTimestampRepairEnabled in the executor code?
isn't the idea to parse the schema once in the driver and infer it in the executor and avoid schema repair code altogether if table does not contain any logical types at all
There was a problem hiding this comment.
This is difficult to fix. Multiple nested callers and functions like handleUpdateInternal are involved here.
There was a problem hiding this comment.
but why can't we add it to hadoopConfiguration thats part of table.getHadoopConf() in the driver and then fetch it from here
| Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); | ||
| // config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data. | ||
| baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); | ||
| // Repair reader schema. |
There was a problem hiding this comment.
This would also be executed in executor and we can probably optimise by adding a flag in the caller. Could not validate though.
There was a problem hiding this comment.
lets fix this. should not take much effort to fix.
lets fix 0.15.1 if need be.
| this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; | ||
| this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; | ||
| this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() && fs.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, | ||
| readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); |
There was a problem hiding this comment.
This is also getting executed in the executor code path. We can optimise by adding a flag in the caller instead. Validated in the logical repair tests added in TestHoodieDeltaStreamer.
There was a problem hiding this comment.
I think we took an informed decision to add optimisation for just metadata table here. We can probably check the others.
|
Will address these comments after cherry-pick commits from branch-0.x |
2b481e3 to
722842e
Compare
…se FileSystem and hadoop Configuration changes (0.14 equivalent)
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
|
why the PR desc is empty? Can we fill that in please |
nsivabalan
left a comment
There was a problem hiding this comment.
I see we are removing SparkAvroPostProcessor.
can we just leave it and not add it by default.
in minor release, wondering if we should remove a utility class.
| Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); | ||
| // config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data. | ||
| baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); | ||
| // Repair reader schema. |
There was a problem hiding this comment.
lets fix this. should not take much effort to fix.
lets fix 0.15.1 if need be.
|
|
||
| Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); | ||
| Schema readerSchema = baseFileReader.getSchema(); | ||
| Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); |
|
|
||
| Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); | ||
| Schema readerSchema = baseFileReader.getSchema(); | ||
| Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); |
There was a problem hiding this comment.
can you help me understand, why are we not trying to infer isLogicalTimestampRepairEnabled in the executor code?
isn't the idea to parse the schema once in the driver and infer it in the executor and avoid schema repair code altogether if table does not contain any logical types at all
| */ | ||
| def hasTimestampMillisField(schema: Schema): Boolean = { | ||
| if (schema == null) { | ||
| true |
There was a problem hiding this comment.
why do we return true if the schema is null?
nsivabalan
left a comment
There was a problem hiding this comment.
I need to pull down the changes and do some additional inspection. for now, you can address existing feedback.
| * @param avroSchema the table's Avro schema | ||
| * @return set of field names whose type is timestamp-millis or local-timestamp-millis | ||
| */ | ||
| def getTimestampMillisColumns(avroSchema: org.apache.avro.Schema): Set[String] = { |
There was a problem hiding this comment.
are we considering only top level fields here?
There was a problem hiding this comment.
It seems like its only top level fields being considered here
There was a problem hiding this comment.
Apache master uses index definition to get source fields and validates those source fields
| ss.emptyDataFrame | ||
| // Avoid calling isEmpty() which can cause serialization issues with Ordering$Reverse | ||
| // Check partition count instead, which doesn't require task serialization | ||
| val structType = convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr)) |
There was a problem hiding this comment.
is this optimization applied to 0.15.1 as well ?
There was a problem hiding this comment.
The code is different in 0.15.1. This optimisation is not required there.
|
|
||
| Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields(); | ||
| Schema readerSchema = baseFileReader.getSchema(); | ||
| Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); |
There was a problem hiding this comment.
but why can't we add it to hadoopConfiguration thats part of table.getHadoopConf() in the driver and then fetch it from here
| // sure that in case the file-schema is not equal to read-schema we'd still | ||
| // be able to read that file (in case projection is a proper one) | ||
| if (!requestedSchema.isPresent()) { | ||
| Schema repairedFileSchema = getRepairedSchema(getSchema(), schema); |
There was a problem hiding this comment.
When we are instantiating the base file reader in L84 in HoodieMergeHelper, if we can embed a boolean flag in hadoopConf, we can fetch it again here and avoid repair calls for tables w/o any logical type.
| conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility); | ||
| } | ||
| return new HoodieAvroReadSupport<>(model); | ||
| return new HoodieAvroReadSupport<>(model, Option.ofNullable(tableSchema).map(schema -> getAvroSchemaConverter(conf).convert(schema)), |
There was a problem hiding this comment.
if hadoopConf has the value for hasLogicalTsField, we can also avoid the additional call in L90
| new HoodieParquetReadSupport( | ||
| convertTz, | ||
| enableVectorizedReader = false, | ||
| enableTimestampFieldRepair = true, |
There was a problem hiding this comment.
shouldn't we set the value to hasTimestampMillisFieldInTableSchema
| } | ||
|
|
||
| @ParameterizedTest | ||
| @CsvSource(value = {"SIX,AVRO,CLUSTER", "CURRENT,AVRO,NONE", "CURRENT,AVRO,CLUSTER", "CURRENT,SPARK,NONE", "CURRENT,SPARK,CLUSTER"}) |
There was a problem hiding this comment.
in 0.x, we don't have support for multiple writer versions. CURRENT and SIX are one and the same.
can we trim the unnecessary combinations.
|
|
||
| @ParameterizedTest | ||
| @CsvSource(value = { | ||
| "SIX,AVRO,CLUSTER,AVRO", |
There was a problem hiding this comment.
can we trim the unnecessary combinations.
|
and can you point me to the places where we avoid calls to repairSchema if its metadata table. I don't remember seeing it. |
nsivabalan
left a comment
There was a problem hiding this comment.
for MOR reads:
HoodieMergeOnReadRDD
L89 -> where we broadcast the hadoop conf, lets inject the hasLogicalTs into it in the driver.
and this eventually will get wired all the way to AbstractHoodieLogRecordReader.fs.getHadoopConf
This piece of code will be executed in executor. We can rely on the boolean flag and decide whether to call repairSchema or not.
| this.forceFullScan = forceFullScan; | ||
| this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; | ||
| this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; | ||
| this.enableLogicalTimestampFieldRepair = !hoodieTableMetaClient.isMetadataTable() && fs.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, |
There was a problem hiding this comment.
so, this is the only place where we optimize for mdt is it?
btw (HoodieMergeOnReadRDD L89) is going to be invoked N no of times for N file groups in driver, then we might be parsing the schema N no of times unless we cache the value of |
| @CsvSource(value = {"SIX,AVRO,CLUSTER", "CURRENT,AVRO,NONE", "CURRENT,AVRO,CLUSTER", "CURRENT,SPARK,NONE", "CURRENT,SPARK,CLUSTER"}) | ||
| void testCOWLogicalRepair(String tableVersion, String recordType, String operation) throws Throwable { | ||
| @CsvSource(value = {"CLUSTER", "NONE"}) | ||
| void testCOWLogicalRepair(String operation) throws Throwable { |
There was a problem hiding this comment.
lets include SPARK as well and not just AVRO
| ParquetReader<IndexedRecord> reader = | ||
| new HoodieAvroParquetReaderBuilder<IndexedRecord>(path) | ||
| new HoodieAvroParquetReaderBuilder<IndexedRecord>(path, | ||
| AvroSchemaUtils.isLogicalTimestampRepairNeeded(conf, false) || schema == null || AvroSchemaUtils.hasTimestampMillisField(schema)) |
There was a problem hiding this comment.
why the default value is false here, but true in all other places?
There was a problem hiding this comment.
This has been removed now
| Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema); | ||
|
|
||
| Schema readerSchema; | ||
| if (AvroSchemaUtils.isLogicalTimestampRepairNeeded(table.getHadoopConf(), true)) { |
There was a problem hiding this comment.
this line is executed in the executor. we should parse the schema in the driver and set the boolean flag in hadoop conf at one of the caller site in the driver.
There was a problem hiding this comment.
This is just a check in hadoop config, its not parsing the schema. I had checked the caller, I think it was HoodieCompactor.
| if (AvroSchemaUtils.hasTimestampMillisField(writerSchema)) { | ||
| AvroSchemaUtils.setLogicalTimestampRepairNeeded(table.getHadoopConf()); | ||
| } | ||
| AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getHadoopConf(), () -> AvroSchemaUtils.hasTimestampMillisField(writerSchema)); |
There was a problem hiding this comment.
do we need to check for mdt here and avoid setting the config?
|
|
||
| jobConf.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, | ||
| java.lang.Boolean.toString(hasTimestampMillisFieldInTableSchema)) | ||
| AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(jobConf, JFunction.toJavaSupplier(() => hasTimestampMillisFieldInTableSchema.asInstanceOf[java.lang.Boolean])) |
There was a problem hiding this comment.
do we need to check for mdt here?
The PR adds optimisation for merged read handle so that schema is repaired only if schema has timestamp millis field. The existence of timestamp millis field is computed in driver. It also addresses review comments in PR #18132 for 0.15.1 version so that repair is applied only in cases where timestamp millis field is present in schema. --------- Co-authored-by: Lokesh Jain <ljain@Lokeshs-MacBook-Pro.local> Co-authored-by: Lokesh Jain <ljain@192.168.1.5> Co-authored-by: sivabalan <n.siva.b@gmail.com>

Describe the issue this Pull Request addresses
This PR combines mainly two PRs that fixing timestamp_millis logical type issue.
Summary and Changelog
The below is the PR description from #14161.
This pr #9743 adds more schema evolution functionality and schema processing. However, we used the InternalSchema system to do various operations such as fix null ordering, reorder, and add columns. At the time, InternalSchema only had a single Timestamp type. When converting back to avro, this was assumed to be micros. Therefore, if the schema provider had any millis columns, the processed schema would end up with those columns as micros.
In this pr to update column stats with better support for logical types: #13711, the schema issues were fixed, as well as additional issues with handling and conversion of timestamps during ingestion.
this pr aims to add functionality to spark and hive readers and writers to automatically repair affected tables.
After switching to use the 1.1 binary, the affected columns will undergo evolution from timestamp-micros to timestamp-mills. Normally a lossy evolution that is not supported, this evolution is ok because the data is actually still timestamp-millis it is just mislabeled as micros in the parquet and table schemas
Impact
When reading from a hudi table using spark or hive reader if the table schema has a column as millis, but the data schema is micros, we will assume that this column is affected and read it as a millis value instead of a micros value. This correction is also applied to all readers that the default write paths use. As a table is rewritten the parquet files will be correct. A table's latest snapshot can be immediately fixed by writing one commit with the 1.1 binary.
Risk Level
High, extensive testing was done and functional tests were added.
Documentation Update
Contributor's checklist