[MINOR] Add optimisation for repair with logical timestamp#18478
Conversation
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — The code introduces a useful optimization but repeats the schema setup pattern three times and the comment explaining the repair logic should be updated to reflect the new conditional behavior.
| .filterCompletedInstants() | ||
| .lastInstant() | ||
| .map(HoodieInstant::getTimestamp); | ||
| Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); |
There was a problem hiding this comment.
🤖 nit: the schema setup pattern on these two lines is duplicated identically in HoodieBackedTableMetadataWriter.java (lines 590–591) and TestHoodieMergedReadHandle.java (lines 204–205) — could you extract this into a static helper method to reduce duplication?
| // Repair reader schema. | ||
| // Assume writer schema should be correct. If not, no repair happens. | ||
| readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, baseFileReaderSchema); | ||
| readerSchema = hasTimestampFields ? AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, this.baseFileReaderSchema) : orignalReaderSchema; |
There was a problem hiding this comment.
🤖 nit: the comment above this line should be updated to clarify that the repair is now conditional — it only happens when hasTimestampFields is true, rather than always repairing.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing! The optimization to skip schema repair when no timestamp-millis fields are present is a good idea. Two things worth checking: there's a potential gap between what hasTimestampMillisField detects and what needsLogicalTypeRepair actually handles (specifically LocalTimestampMicros), and the schema computation in HoodieBackedTableMetadataWriter could be hoisted out of the lambda to fully realize the driver-side computation benefit. Details in inline comments.
| // Repair reader schema. | ||
| // Assume writer schema should be correct. If not, no repair happens. | ||
| readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, baseFileReaderSchema); | ||
| readerSchema = hasTimestampFields ? AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, this.baseFileReaderSchema) : orignalReaderSchema; |
There was a problem hiding this comment.
🤖 The guard hasTimestampFields is checked via hasTimestampMillisField, which returns true only for TimestampMillis and LocalTimestampMillis. However, needsLogicalTypeRepair in AvroSchemaRepair also repairs the case where the file schema is a plain LONG and the table schema has LocalTimestampMicros. In that scenario, hasTimestampMillisField returns false but the repair would still be needed. Could you verify this gap doesn't affect any realistic schema canonicalization path, or broaden the check to cover all timestamp logical types that needsLogicalTypeRepair handles?
| final String fileId = fileSlice.getFileId(); | ||
| return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), | ||
| Option.of(fileSlice)).getMergedRecords().stream().map(record -> { | ||
| Schema baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField()); |
There was a problem hiding this comment.
🤖 The schema parsing and hasTimestampMillisField computation are inside the flatMap lambda, so they're re-evaluated for every partition/file-slice pair. The PR description says this should be computed on the driver side. Could you hoist baseFileReaderSchema and hasTimestampFields above the flatMap, similar to how it's done in HoodieIndexUtils.getExistingRecords?
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for iterating on this. The conf-based flag system is a reasonable approach for propagating the repair-needed signal from driver to executors. From my previous review: the LocalTimestampMicros gap in hasTimestampMillisField is still present (now in the compactor path), and the metadata writer schema hoisting wasn't part of this diff. A couple of questions about the new design — the unconditional flag-set in HoodieMergeHelper and the asymmetric defaults in HoodieAvroParquetReader — are noted in the inline comments.
| Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient); | ||
| // Since we are using merge handle here, we can directly query the write schema from conf | ||
| // Write handle provides an option to use overridden write schema as well which is not used by merge handle | ||
| Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); |
There was a problem hiding this comment.
🤖 Same gap as before — hasTimestampMillisField only returns true for MILLIS-precision timestamps, but needsLogicalTypeRepair also handles the case where the file schema is a plain LONG and the table schema has LocalTimestampMicros (non-UTC-adjusted). If someone's write schema has LocalTimestampMicros fields, the flag won't be set, and the downstream parquet read support could skip a needed repair. Could you either broaden this check to cover all timestamp logical types that needsLogicalTypeRepair handles, or add a comment explaining why LocalTimestampMicros can't appear here?
🤖 This review comment was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); | ||
|
|
||
| HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType(); | ||
| AvroSchemaUtils.setLogicalTimestampRepairNeeded(table.getStorageConf()); |
There was a problem hiding this comment.
🤖 setLogicalTimestampRepairNeeded is called unconditionally here, so the optimization (skip repair when no timestamp fields) doesn't apply to the merge path. Was this intentional? If so, it might be worth a brief comment explaining why the merge path can't benefit from this optimization (e.g., because the writer schema isn't available at this point, or because the base file schema could differ).
🤖 This review comment was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| // be able to read that file (in case projection is a proper one) | ||
| Configuration hadoopConf = storage.getConf().unwrapCopyAs(Configuration.class); | ||
| Schema repairedFileSchema = getRepairedSchema(getSchema(), schema); | ||
| Schema repairedFileSchema; |
There was a problem hiding this comment.
🤖 This uses defaultValue=true while the builder on line 192 uses defaultValue=false. When the conf flag isn't explicitly set (e.g., reads not going through merge/compaction), the file schema gets repaired here but the parquet read support may not repair. Is the asymmetry intentional — i.e., is Avro-level schema repair always safe/cheap enough to keep as default, while the parquet-level repair is the expensive one being optimized?
🤖 This review comment was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice updates — removing the conf-flag approach in favor of direct checks simplifies the design. The unconditional flag-set in HoodieMergeHelper (prior question) and the asymmetric defaults (prior question) are both resolved by this cleanup. However, the LocalTimestampMicros gap I flagged earlier is still open and is now extended to the Spark34 read path — previously the repair was unconditionally enabled there, so this change is a regression for tables with LocalTimestampMicros fields.
| convertTz, | ||
| enableVectorizedReader = false, | ||
| enableTimestampFieldRepair = true, | ||
| enableTimestampFieldRepair = hasTimestampMillisFieldInTableSchema, |
There was a problem hiding this comment.
🤖 This change now gates the parquet-level timestamp repair on hasTimestampMillisFieldInTableSchema, which only detects TimestampMillis and LocalTimestampMillis. The same LocalTimestampMicros gap I flagged earlier on the compactor path now applies here too: if the table schema has LocalTimestampMicros fields and the base file was written as plain LONG, hasTimestampMillisFieldInTableSchema will be false, enableTimestampFieldRepair will be false, and the needed repair will be skipped during Spark reads. Before this change the repair was unconditionally enabled, so this is a regression for that (admittedly uncommon) case. Could you broaden the check to cover all logical timestamp types that needsLogicalTypeRepair handles — or rename it to something like hasTimestampLogicalTypeField so the scope is clear?
🤖 This review comment was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for iterating on this — the consolidation to a single config key (ENABLE_LOGICAL_TIMESTAMP_REPAIR) and the setLogicalTimestampRepairIfNotSet/isLogicalTimestampRepairNeeded helpers are a nice cleanup. Caching hasTimestampMillisFieldInTableSchema as a lazy val in HoodieBaseRelation is a good optimization too. Two concerns remain: the LocalTimestampMicros gap I flagged in prior rounds is now a concrete regression in the compaction path (where the old default-true fallback is gone), and the new schema resolution in BaseSparkCommitActionExecutor can fail the entire commit if getTableAvroSchema() throws.
| Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient); | ||
| // Since we are using merge handle here, we can directly query the write schema from conf | ||
| // Write handle provides an option to use overridden write schema as well which is not used by merge handle | ||
| Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); |
There was a problem hiding this comment.
🤖 This still has the LocalTimestampMicros gap I've flagged in prior rounds. Concretely: the old code left ENABLE_LOGICAL_TIMESTAMP_REPAIR unset when hasTimestampMillisField was false, so HoodieMergeHelper fell through to its default of true and repair happened. Now setLogicalTimestampRepairIfNotSet writes "false" into the config, HoodieMergeHelper reads it, and repair is skipped. For a table whose schema has LocalTimestampMicros (non-UTC local timestamp with microsecond precision), this is a regression — base files written as plain LONG will no longer be repaired during compaction. Could you broaden the check to cover all logical timestamp types that needsLogicalTypeRepair handles (i.e., any non-UTC-adjusted local timestamp, regardless of precision)?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| if (!table.isMetadataTable() && table.getMetaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 0) { | ||
| TableSchemaResolver schemaResolver = new TableSchemaResolver(table.getMetaClient()); | ||
| try { | ||
| AvroSchemaUtils.setLogicalTimestampRepairIfNotSet(table.getStorageConf(), () -> { |
There was a problem hiding this comment.
🤖 If getTableAvroSchema() throws (e.g., corrupted timeline, missing commit metadata), the entire commit operation will fail. Previously, repair was unconditionally enabled, so a schema resolution failure wouldn't block writes. Could you catch the inner exception and default to true (enable repair) instead of aborting? Something like wrapping the supplier to return true on failure would preserve the old fail-safe behavior.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the exception-handling feedback — returning true on schema resolution failure in BaseSparkCommitActionExecutor is exactly right and preserves the fail-safe behavior. The Spark adapter and MergeOnRead relation refactors to use the centralized helpers look correct and semantically equivalent to the old code. The one open item from prior rounds is the LocalTimestampMicros gap in the compaction path (and other callers of hasTimestampMillisField), which is still unaddressed — see inline comment.
There was a problem hiding this comment.
🤖 Line 136: This LocalTimestampMicros gap is still open from prior rounds. hasTimestampMillisField only returns true for MILLIS-precision timestamps, but needsLogicalTypeRepair also handles the case where the file schema is a plain LONG and the table schema has LocalTimestampMicros. With setLogicalTimestampRepairIfNotSet now writing false into the config for such tables (instead of leaving it unset), the downstream repair is skipped — a regression from the old default-true fallback. Could you broaden the check to cover all logical timestamp types that needsLogicalTypeRepair handles, or rename it to something like hasTimestampLogicalTypeField?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the incremental improvements — the metadata table guard for skipping unnecessary timestamp repair setup, the HoodieMergedReadHandle constructor simplification, and the expanded test coverage with SPARK record merger are all welcome. However, the IncrementalRelation change introduces a regression: moving the schema-evolution and timestamp-repair settings onto a local Configuration copy means they no longer reach executor-side file format readers (the code comment even says "pass internalSchema to hadoopConf, so it can be used in executors"). The prior finding about LocalTimestampMicros not being covered by hasTimestampMillisField also remains open — see inline comments for both.
| @@ -205,16 +209,15 @@ class IncrementalRelation(val sqlContext: SQLContext, | |||
| // pass internalSchema to hadoopConf, so it can be used in executors. | |||
| val validCommits = metaClient | |||
| .getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",") | |||
There was a problem hiding this comment.
🤖 Moving these settings from the shared sqlContext.sparkContext.hadoopConfiguration to a local copy conf means they won't reach the file format readers on executors. The existing comment on line 205 says "pass internalSchema to hadoopConf, so it can be used in executors" — sqlContext.read (line ~294) serializes the shared hadoop conf, not this local copy. Schema evolution and timestamp repair will silently break for this read path.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient); | ||
| // Since we are using merge handle here, we can directly query the write schema from conf | ||
| // Write handle provides an option to use overridden write schema as well which is not used by merge handle | ||
| Schema writerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); |
There was a problem hiding this comment.
🤖 The metadata table guard is a nice optimization, but hasTimestampMillisField still doesn't cover LocalTimestampMicros — so for non-metadata tables with LocalTimestampMicros fields, setLogicalTimestampRepairIfNotSet writes false, skipping the repair that needsLogicalTypeRepair would otherwise perform. Could you add LocalTimestampMicros to the check (or rename it to reflect all handled types)?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Describe the issue this Pull Request addresses
The PR adds optimisation for merged read handle so that schema is repaired only if schema has timestamp millis field. The existence of timestamp millis field is computed in driver.
It also addresses review comments in PR #18132 for 0.15.1 version so that repair is applied only in cases where timestamp millis field is present in schema.
Summary and Changelog
The PR adds optimisation for merged read handle so that schema is repaired only if schema has timestamp millis field. The existence of timestamp millis field is computed in driver.
It also addresses review comments in PR #18132 for 0.15.1 version so that repair is applied only in cases where timestamp millis field is present in schema.
Impact
NA
Risk Level
low
Documentation Update
NA
Contributor's checklist