fix: Enable schema merging for incremental and dfs sources#18385
Conversation
b13559e to
5adce47
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing! The fix is well-motivated and the implementation is clean — applying mergeSchema before SPARK_DATASOURCE_OPTIONS so the user override ordering is correct. One test reliability concern worth addressing before merging.
5adce47 to
a262c46
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — Minor consistency issue in test code: first test method doesn't extract result.get() to a variable like the other test methods do.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
LGTM — clean, well-scoped fix that adds mergeSchema support for Parquet reads in cloud source ingestion. The config is properly defined with alternative keys and sensible defaults, and the ordering (apply before SPARK_DATASOURCE_OPTIONS) correctly allows user overrides. Tests cover the key scenarios.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR extends the cloud incremental source mergeSchema behavior to ORC and unifies it under a single config key with backward-compatible aliases. No critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
cc @yihua
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR enables Spark's mergeSchema option for Parquet/ORC reads in S3/GCS incremental cloud sources, adds a unified config with back-compat aliases for the prior parquet-only key, and flips the existing ParquetDFSSourceConfig default. One question worth confirming about the scope of the default change. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of minor naming/consistency suggestions below.
cadeb4b to
86ed1c6
Compare
6652082 to
4a96745
Compare
| // Back-compat aliases: an earlier iteration of this PR used a `merge_schema` (underscore) | ||
| // form and an earlier dot-style `merge.schema` form, plus the original parquet-only key. | ||
| // All four are still honored. | ||
| STREAMER_CONFIG_PREFIX + "source.cloud.data.merge_schema.enable", | ||
| DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.merge_schema.enable", | ||
| STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema", | ||
| DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema", | ||
| STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema", | ||
| DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema") |
There was a problem hiding this comment.
We don't need these alternative configs, since this PR is never merged and used by any code.
| // Back-compat aliases for the underscore-style keys used in an earlier iteration of this PR. | ||
| STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge_schema.enable", | ||
| DELTA_STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge_schema.enable") |
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR enables mergeSchema by default on the DFS Parquet/ORC sources and the cloud-incremental S3/GCS source so heterogeneous-schema batches don't silently drop columns, with back-compat aliases for the previous config keys. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One minor naming nit in CloudObjectsSelectorCommon; the rest of the change is clean and well-documented.
cc @yihua
| if (fileFormat == null) { | ||
| return false; | ||
| } | ||
| String f = fileFormat.trim(); |
There was a problem hiding this comment.
🤖 nit: the single-character name f doesn't communicate intent here — could you rename it to trimmed (or just inline fileFormat.trim() in the return expression) so it's immediately clear what the variable represents?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
4a96745 to
ff794b0
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR enables Spark mergeSchema for cloud-incremental and DFS Parquet/ORC sources to fix silent column drops on heterogeneous-schema batches, with appropriate back-compat alternatives and a release-notes call-out for the default flip. No new critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A few naming suggestions below, but the change is clean overall.
cc @yihua
| static boolean isParquetOrOrcFileFormat(String fileFormat) { | ||
| if (fileFormat == null) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
🤖 nit: could you rename f to something like trimmed or normalizedFormat? Single-letter locals make sense in tiny lambdas but here it's a named local in a package-private method that test code calls directly, so a slightly longer name would make the reader's intent clearer at a glance.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Widens the parquet-only mergeSchema injection in CloudObjectsSelectorCommon to also cover ORC. Spark's native ORC reader honors the per-read `mergeSchema` option on Spark 3.0+ (the native ORC impl has been the default since Spark 2.4); on older runtimes the option is silently ignored, which is harmless. Renames the config from CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA to CLOUD_INCREMENTAL_MERGE_SCHEMA (key: source.cloud.data.merge.schema) and keeps the previous parquet-only key as a back-compat alternative so existing customer overrides continue to work. Renames the helper applyParquetMergeSchemaOption -> applyMergeSchemaOption and isParquetFileFormat -> isParquetOrOrcFileFormat. No external production callers reference the old names by symbol. Adds three ORC tests mirroring the existing parquet ones: - orcMixedSchemasMergedByDefault - orcMixedSchemasDropExtraColumnsWhenMergeDisabled - orcSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…conflict) The three orcMixedSchemas* tests added in the prior commit fail in hudi-utilities CI with `NoSuchFieldError: type` from `OrcMapredRecordWriter.addVariableLengthColumns`. Hudi-utilities pulls in `orc-core-nohive` (kept for compile-time reasons per the pom comment) while Spark 3.x's ORC writer was compiled against regular `orc-core`, which has a `type` field the nohive variant lacks. Result: `sparkSession.write().orc(...)` cannot be used in this module's tests. Replaces the three e2e ORC tests with a single predicate test on `CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(String)` that verifies the format-gating logic without exercising Spark's ORC writer. The e2e behaviour for ORC mirrors Parquet via the shared helper `applyMergeSchemaOption`, which is already covered by the three Parquet e2e tests. The ORC-vs-Parquet difference is one boolean check in the predicate, which the new test pins. `isParquetOrOrcFileFormat` becomes package-private (was private) so the test in the same package can reference it without reflection. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per PR review feedback: this test asserts that with the Hudi
CLOUD_INCREMENTAL_MERGE_SCHEMA flag turned off, the result has only one
file's columns (2, not 3). It implicitly relies on Spark's session-level
spark.sql.parquet.mergeSchema being false, which is the Spark default
but can be overridden by the test runner. If a session enables
mergeSchema globally, Spark merges anyway and the test fails.
The other two parquet tests are reliable because they set the
mergeSchema option explicitly per-read (which overrides session-level
config):
- parquetMixedSchemasMergedByDefault: Hudi default true sets
reader.option("mergeSchema", "true").
- parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns:
SPARK_DATASOURCE_OPTIONS sets reader.option("mergeSchema", "false").
The override path (SPARK_DATASOURCE_OPTIONS) and the default-on path
(CLOUD_INCREMENTAL_MERGE_SCHEMA defaults to true) are both still
covered. The flag-off-but-session-default path is not covered and is
the part deemed flaky.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…olumns Per PR review feedback: this test relies on the per-read mergeSchema option overriding the session-level spark.sql.parquet.mergeSchema. While that's the documented Spark behaviour, a test runner with the session- level config set to true could surface a false negative depending on order of operations. The reviewer flagged it as the same flakiness class as the previously-removed parquetMixedSchemasDropExtraColumnsWhenMergeDisabled. Coverage that remains: - parquetMixedSchemasMergedByDefault: end-to-end happy path, Hudi default (true) → mergeSchema=true on the reader → merged result. Reliable because the per-read option is set, overriding any session config. - isParquetOrOrcFileFormatRecognisesBothFormats: format-gating predicate. The override path (SPARK_DATASOURCE_OPTIONS overriding the Hudi flag) is no longer covered by an e2e test; the option-merge order is exercised by Spark's own DataFrameReader semantics, not by Hudi-side logic. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Both removed flaky tests (parquetMixedSchemasDropExtraColumnsWhenMergeDisabled and parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns) were the only references to CloudSourceConfig in this test file. Drop the now-unused import to satisfy the UnusedImports checkstyle rule. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…urce
Follows up on the cloud-incremental-source fix in this PR. Closes the
remaining FS-based source gaps from ENG-41047 except the MetadataBootstrap
paths (handled separately as those need partition-level schema discovery).
- ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA: default flipped from
false to true so that heterogeneous-schema parquet files in a single
Hudi Streamer batch get a unioned schema instead of silently dropping
columns that exist only in some files. Set to false to restore the
prior single-file-schema-wins behavior.
- New ORCDFSSourceConfig.ORC_DFS_MERGE_SCHEMA, default true, mirroring
the parquet config. Plumbed through ORCDFSSource.fromFiles() as
reader.option("mergeSchema", flag). Requires spark.sql.orc.impl=native
(default since Spark 2.4); silently ignored under the Hive impl.
- TestAvroDFSSource: regression test confirming additive-schema evolution
across files works end-to-end. Writes one narrow + one wider Avro file,
configures the source's reader schema to the wider one, and asserts
records from the narrow file get the wider schema's default for the
new field while records from the wider file preserve their value.
- Drop withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + ...) on the new CLOUD_INCREMENTAL_MERGE_SCHEMA and ORC_DFS_MERGE_SCHEMA configs; legacy prefix alternatives are not needed for new keys. - Drop the new-key-with-legacy-prefix alternative on PARQUET_DFS_MERGE_SCHEMA; keep underscore-style back-compat aliases since the key itself was renamed. - Static-import assertTrue/assertFalse in the new tests.
ff794b0 to
f609bff
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18385 +/- ##
=========================================
Coverage 68.14% 68.14%
- Complexity 29109 29118 +9
=========================================
Files 2517 2518 +1
Lines 141197 141221 +24
Branches 17529 17531 +2
=========================================
+ Hits 96212 96238 +26
- Misses 37068 37069 +1
+ Partials 7917 7914 -3
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
#18382 — source files with heterogeneous schemas in a single Hudi-Streamer batch can silently drop columns. This PR closes the FS-based-source side of that bug class. The MetadataBootstrap paths (
ParquetBootstrapMetadataHandler,OrcBootstrapMetadataHandler) are deferred to a follow-up because they need partition-level schema discovery, which is architecturally bigger than the per-read knobs in this PR.Summary and Changelog
Cloud incremental sources (S3/GCS):
hoodie.streamer.source.cloud.data.merge.schema.enable(since 1.2.0, advanced), defaulttrue. Covers both Parquet and ORC.CloudObjectsSelectorCommon, the format gate is widened toparquet || orcand Spark reader optionmergeSchema=trueis set before applyingSPARK_DATASOURCE_OPTIONS, so users can still override per-format via JSON options (e.g.{"mergeSchema":"false"}).TestCloudObjectsSelectorCommoncovers Parquet end-to-end (merged-by-default) plus a predicate test for the format dispatch. End-to-end ORC isn't exercised in this module becausehudi-utilitiespulls inorc-core-nohivewhich conflicts with Spark 3.x's ORC writer; the predicate test pins the format dispatch and the e2e ORC behaviour matches Parquet via the shared helper.ParquetDFSSource:
4.
ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMAdefault flipped fromfalsetotrue. Heterogeneous-schema parquet files in a single Streamer batch now get a unioned schema instead of silently dropping columns. The new primary key ishoodie.streamer.source.parquet.dfs.merge.schema.enable; the previous underscore-style key (...merge_schema.enable, since 0.15.0) is preserved as a back-compat alternative.ORCDFSSource:
5. New
ORCDFSSourceConfig.ORC_DFS_MERGE_SCHEMA(hoodie.streamer.source.orc.dfs.merge.schema.enable, since 1.2.0, advanced), defaulttrue. Mirrors the parquet config. Plumbed intoORCDFSSource.fromFiles()asreader.option("mergeSchema", flag). Requiresspark.sql.orc.impl=native(default since Spark 2.4); silently ignored under the Hive impl.AvroDFSSource regression test:
6.
TestAvroDFSSource.testAdditiveSchemaEvolutionAcrossFileswrites one narrow + one wider Avro file under a unique subdirectory, configures the source's reader schema to the wider one, and asserts records from the narrow file get the wider schema's default for the new field while records from the wider file preserve their value. Locks in Avro reader/writer schema-resolution behaviour end-to-end throughAvroDFSSource.Impact
ParquetDFSSourceusers, not just cloud incremental.Compatibility:
falseto restore the previous reader behavior.parquet.dfs.merge_schema.enablekey continues to work viawithAlternatives— explicit overrides preserved.mergeSchemacan add work (footer/schema aggregation) versus a single-schema read; usually acceptable relative to correctness for heterogeneous batches.Risk Level
Medium. Touches default behaviour for three FS-based source paths; mitigations: each format has its own kill-switch config, and
SPARK_DATASOURCE_OPTIONScontinues to override per-read.Documentation Update
The new configs
hoodie.streamer.source.cloud.data.merge.schema.enableandhoodie.streamer.source.orc.dfs.merge.schema.enableare documented inCloudSourceConfigandORCDFSSourceConfigrespectively. The flipped default is documented inParquetDFSSourceConfig. Release notes should highlight theParquetDFSSourcedefault flip explicitly, since it changes behaviour for non-cloud DFS users.Contributor's checklist
TestCloudObjectsSelectorCommon,TestParquetDFSSource,TestAvroDFSSourceall pass locally (14 tests, 0 failures, 0 errors).