Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,17 @@ public class CloudSourceConfig extends HoodieConfig {
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Boolean value to allow coalesce alias columns with actual columns while reading from source");

public static final ConfigProperty<Boolean> CLOUD_INCREMENTAL_MERGE_SCHEMA = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema.enable")
.defaultValue(true)
Comment thread
linliu-code marked this conversation as resolved.
.markAdvanced()
.sinceVersion("1.2.0")
.withDocumentation("For Parquet and ORC data files in S3/GCS incremental ingestion, merge schemas across all "
+ "files in each read (Spark mergeSchema). Default true so mixed-schema batches during initial "
+ "ingest/bootstrap produce a valid unified schema. Set false to restore prior behavior. "
+ SPARK_DATASOURCE_OPTIONS.key() + " is applied after this flag and can override mergeSchema. "
+ "Note: the per-read mergeSchema option is honored by Spark's native Parquet reader and by Spark's "
+ "native ORC reader (Spark 3.0+, default ORC impl since Spark 2.4). On older runtimes the option is "
+ "silently ignored.");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.utilities.config;

import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;

import javax.annotation.concurrent.Immutable;

import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;

/**
* ORC DFS Source Configs
*/
@Immutable
@ConfigClassProperty(name = "ORC DFS Source Configs",
groupName = ConfigGroups.Names.HUDI_STREAMER,
subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE,
description = "Configurations controlling the behavior of ORC DFS source in Hudi Streamer.")
public class ORCDFSSourceConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> ORC_DFS_MERGE_SCHEMA = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge.schema.enable")
.defaultValue(true)
.markAdvanced()
.sinceVersion("1.2.0")
.withDocumentation("Whether to merge schema across ORC files within a single read. "
+ "Defaults to true: heterogeneous-schema source files (e.g. during bootstrap or "
+ "evolving producers) get a unioned schema instead of silently dropping columns "
+ "that exist only in some files. Requires spark.sql.orc.impl=native (default since "
+ "Spark 2.4); the option is silently ignored under spark.sql.orc.impl=hive.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@
public class ParquetDFSSourceConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> PARQUET_DFS_MERGE_SCHEMA = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable")
.defaultValue(false)
.withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable")
.key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge.schema.enable")
.defaultValue(true)
Comment thread
linliu-code marked this conversation as resolved.
.withAlternatives(
// Back-compat aliases for the previous underscore-style keys (since 0.15.0).
STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable",
DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable")
.markAdvanced()
.sinceVersion("0.15.0")
.withDocumentation("Merge schema across parquet files within a single write");
.withDocumentation("Whether to merge schema across parquet files within a single read. "
+ "Defaults to true: heterogeneous-schema source files (e.g. during bootstrap or "
+ "evolving producers) get a unioned schema instead of silently dropping columns "
+ "that exist only in some files. Set to false to restore the previous reader "
+ "behavior (single file's schema wins).");
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.ORCDFSSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;

Expand All @@ -30,6 +31,8 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;

/**
* DFS Source that reads ORC data.
*/
Expand All @@ -53,6 +56,7 @@ public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint>
}

private Dataset<Row> fromFiles(String pathStr) {
return sparkSession.read().orc(pathStr.split(","));
boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props, ORCDFSSourceConfig.ORC_DFS_MERGE_SCHEMA);
return sparkSession.read().option("mergeSchema", mergeSchemaEnabled).orc(pathStr.split(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint>
}

private Dataset<Row> fromFiles(String pathStr) {
boolean mergeSchemaOption = getBooleanWithAltKeys(this.props, ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA);
return sparkSession.read().option("mergeSchema", mergeSchemaOption).parquet(pathStr.split(","));
boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props, ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA);
return sparkSession.read().option("mergeSchema", mergeSchemaEnabled).parquet(pathStr.split(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA;
import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX;
import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR;
Expand Down Expand Up @@ -281,7 +282,7 @@ public Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMe
if (isNullOrEmpty(cloudObjectMetadata)) {
return Option.empty();
}
DataFrameReader reader = spark.read().format(fileFormat);
DataFrameReader reader = applyMergeSchemaOption(spark.read().format(fileFormat), fileFormat);
String datasourceOpts = getStringWithAltKeys(properties, CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);

StructType rowSchema = null;
Expand Down Expand Up @@ -546,6 +547,34 @@ private static Option<String> getPropVal(TypedProperties props, ConfigProperty<S
return Option.empty();
}

/**
* Enables Spark {@code mergeSchema} for cloud object batches of Parquet or ORC files when configured, so
* heterogeneous files in one sync round share a merged struct type. Applied before user
* {@link CloudSourceConfig#SPARK_DATASOURCE_OPTIONS} so explicit reader options can override.
*
* <p>Spark's native Parquet reader honors {@code mergeSchema} on all supported versions. Spark's native ORC
* reader honors it on Spark 3.0+ (the native ORC impl is the default since Spark 2.4); on older runtimes the
* option is silently ignored, which is harmless.
*/
private DataFrameReader applyMergeSchemaOption(DataFrameReader reader, String fileFormat) {
if (!isParquetOrOrcFileFormat(fileFormat)) {
return reader;
}
if (!getBooleanWithAltKeys(properties, CLOUD_INCREMENTAL_MERGE_SCHEMA)) {
return reader;
}
return reader.option("mergeSchema", "true");
}

// Package-private for unit testing — see TestCloudObjectsSelectorCommon.
static boolean isParquetOrOrcFileFormat(String fileFormat) {
if (fileFormat == null) {
return false;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you rename f to something like trimmed or normalizedFormat? Single-letter locals make sense in tiny lambdas but here it's a named local in a package-private method that test code calls directly, so a slightly longer name would make the reader's intent clearer at a glance.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

String f = fileFormat.trim();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the single-character name f doesn't communicate intent here — could you rename it to trimmed (or just inline fileFormat.trim() in the return expression) so it's immediately clear what the variable represents?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return "parquet".equalsIgnoreCase(f) || "orc".equalsIgnoreCase(f);
}

public enum Type {
S3,
GCS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,28 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Basic tests for {@link TestAvroDFSSource}.
*/
Expand All @@ -54,4 +68,93 @@ protected Source prepareDFSSource(TypedProperties props) {
protected void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException {
Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path);
}

/**
* Regression test: when a single batch contains files with additive schema evolution
* (one file has the base schema, another has the same fields plus an extra field with a
* default), reading via {@link AvroDFSSource} configured with the wider reader schema must
* (a) not fail, (b) return all records from both files, and (c) materialize the wider field
* as the default for records from the narrow file and as the written value for records from
* the wider file. Locks in Avro reader/writer schema-resolution behavior.
*/
@Test
public void testAdditiveSchemaEvolutionAcrossFiles() throws Exception {
// Use a unique subdirectory because basePath is static and shared with
// the parent testReadingFromSource, which writes 10000+ records into dfsRoot
// and would otherwise pollute this test's read.
String additiveRoot = basePath + "/avroFilesAdditive";
fs.mkdirs(new Path(additiveRoot));

Schema narrowSchema = HoodieTestDataGenerator.AVRO_SCHEMA;
Schema widerSchema = addStringFieldWithDefault(narrowSchema, "additive_field", "DEFAULT");

// File A: narrow writer schema, no additive_field.
int narrowCount = 30;
List<GenericRecord> narrowRecords = Helpers.toGenericRecords(
dataGenerator.generateInserts("000", narrowCount));
Path pathA = new Path(additiveRoot, "narrow" + fileSuffix);
Helpers.saveAvroToDFS(narrowRecords, pathA, narrowSchema);

// File B: wider writer schema, additive_field set to a known value.
int wideCount = 20;
List<GenericRecord> wideRecords = new ArrayList<>();
for (GenericRecord narrow : Helpers.toGenericRecords(
dataGenerator.generateInserts("001", wideCount))) {
GenericRecord wide = new GenericData.Record(widerSchema);
for (Schema.Field f : narrowSchema.getFields()) {
wide.put(f.name(), narrow.get(f.name()));
}
wide.put("additive_field", "WRITTEN");
wideRecords.add(wide);
}
Path pathB = new Path(additiveRoot, "wider" + fileSuffix);
Helpers.saveAvroToDFS(wideRecords, pathB, widerSchema);

// Write the wider schema to DFS and point a fresh schema provider at it so the source's
// reader schema is wider than file A's writer schema.
Path widerSchemaFile = new Path(basePath + "/wider-source.avsc");
try (OutputStream out = fs.create(widerSchemaFile)) {
out.write(widerSchema.toString(true).getBytes(StandardCharsets.UTF_8));
}
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.streamer.source.dfs.root", additiveRoot);
props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", widerSchemaFile.toString());
FilebasedSchemaProvider widerProvider = new FilebasedSchemaProvider(props, jsc);
AvroDFSSource source = new AvroDFSSource(props, jsc, sparkSession, widerProvider);

SourceFormatAdapter adapter = new SourceFormatAdapter(source);
JavaRDD<GenericRecord> fetched =
adapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
List<GenericRecord> read = fetched.collect();

assertEquals(narrowCount + wideCount, read.size(),
"Both narrow and wider files should be read in the same batch");

long defaulted = read.stream()
.filter(r -> "DEFAULT".equals(String.valueOf(r.get("additive_field"))))
.count();
long preserved = read.stream()
.filter(r -> "WRITTEN".equals(String.valueOf(r.get("additive_field"))))
.count();
assertEquals(narrowCount, defaulted,
"Records from the narrow file should get the wider reader schema's default for additive_field");
assertEquals(wideCount, preserved,
"Records from the wider file should preserve the written value of additive_field");
}

/**
* Returns a copy of {@code base} with one extra optional string field appended, defaulting
* to {@code defaultValue}. The new field has a non-null default so Avro's schema-resolution
* can fill it in for records read with this schema but written under {@code base}.
*/
private static Schema addStringFieldWithDefault(Schema base, String fieldName, String defaultValue) {
List<Schema.Field> fields = new ArrayList<>();
for (Schema.Field f : base.getFields()) {
fields.add(new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()));
}
fields.add(new Schema.Field(fieldName, Schema.create(Schema.Type.STRING), null, defaultValue));
Schema wider = Schema.createRecord(base.getName(), base.getDoc(), base.getNamespace(), false);
wider.setFields(fields);
return wider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,27 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness {

Expand Down Expand Up @@ -200,6 +209,59 @@ void loadDatasetWithNestedSchemaAndCoalesceAliases() throws IOException {
Assertions.assertEquals(expectedSchema, result.get().schema(), "output dataset schema should match source schema");
}

@Test
void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) {
String p1 = tempDir.resolve("part1").toString();
String p2 = tempDir.resolve("part2").toString();

StructType schema1 = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("id", DataTypes.IntegerType, true),
DataTypes.createStructField("b", DataTypes.StringType, true)));
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1)
.write().parquet(p1);

StructType schema2 = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("id", DataTypes.IntegerType, true),
DataTypes.createStructField("c", DataTypes.IntegerType, true)));
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 99)), schema2)
.write().parquet(p2);

CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(new TypedProperties());
List<CloudObjectMetadata> input = Arrays.asList(
new CloudObjectMetadata(p1, 1L),
new CloudObjectMetadata(p2, 1L));
Option<Dataset<Row>> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1);
assertTrue(result.isPresent());
Dataset<Row> ds = result.get();
Assertions.assertEquals(2, ds.count());
Set<String> colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());
assertTrue(colNames.contains("b"));
assertTrue(colNames.contains("c"));
}

/**
* Verifies that the format-gating predicate for the cloud-incremental mergeSchema option recognises
* Parquet and ORC and rejects everything else. End-to-end ORC ingestion is not exercised here because
* {@code hudi-utilities} pulls in {@code orc-core-nohive} while Spark 3.x's ORC writer expects the
* regular {@code orc-core}; that classpath conflict makes {@code sparkSession.write().orc(...)} fail
* with {@code NoSuchFieldError: type} in this module's tests. The end-to-end behaviour for ORC is
* covered by Parquet's tests via the shared helper, plus this predicate test for the format dispatch.
*/
@Test
void isParquetOrOrcFileFormatRecognisesBothFormats() {
assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("parquet"));
assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("PARQUET"));
assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("orc"));
assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("ORC"));
assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" parquet "));
assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" orc "));
assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("json"));
assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("csv"));
assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("avro"));
assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(""));
assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(null));
}

@Test
public void partitionKeyNotPresentInPath() {
List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1));
Expand Down
Loading