diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index ec13e502fe31f..6ffd5ad5c7ae3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -181,4 +181,17 @@ public class CloudSourceConfig extends HoodieConfig { .markAdvanced() .sinceVersion("1.0.0") .withDocumentation("Boolean value to allow coalesce alias columns with actual columns while reading from source"); + + public static final ConfigProperty CLOUD_INCREMENTAL_MERGE_SCHEMA = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema.enable") + .defaultValue(true) + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("For Parquet and ORC data files in S3/GCS incremental ingestion, merge schemas across all " + + "files in each read (Spark mergeSchema). Default true so mixed-schema batches during initial " + + "ingest/bootstrap produce a valid unified schema. Set false to restore prior behavior. " + + SPARK_DATASOURCE_OPTIONS.key() + " is applied after this flag and can override mergeSchema. " + + "Note: the per-read mergeSchema option is honored by Spark's native Parquet reader and by Spark's " + + "native ORC reader (Spark 3.0+, default ORC impl since Spark 2.4). On older runtimes the option is " + + "silently ignored."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java new file mode 100644 index 0000000000000..b181d0257331f --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * ORC DFS Source Configs + */ +@Immutable +@ConfigClassProperty(name = "ORC DFS Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of ORC DFS source in Hudi Streamer.") +public class ORCDFSSourceConfig extends HoodieConfig { + + public static final ConfigProperty ORC_DFS_MERGE_SCHEMA = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge.schema.enable") + .defaultValue(true) + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("Whether to merge schema across ORC files within a single read. " + + "Defaults to true: heterogeneous-schema source files (e.g. during bootstrap or " + + "evolving producers) get a unioned schema instead of silently dropping columns " + + "that exist only in some files. Requires spark.sql.orc.impl=native (default since " + + "Spark 2.4); the option is silently ignored under spark.sql.orc.impl=hive."); +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java index a8906c9f70b0d..b35842d662829 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java @@ -40,10 +40,17 @@ public class ParquetDFSSourceConfig extends HoodieConfig { public static final ConfigProperty PARQUET_DFS_MERGE_SCHEMA = ConfigProperty - .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable") - .defaultValue(false) - .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable") + .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge.schema.enable") + .defaultValue(true) + .withAlternatives( + // Back-compat aliases for the previous underscore-style keys (since 0.15.0). + STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable", + DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable") .markAdvanced() .sinceVersion("0.15.0") - .withDocumentation("Merge schema across parquet files within a single write"); + .withDocumentation("Whether to merge schema across parquet files within a single read. " + + "Defaults to true: heterogeneous-schema source files (e.g. during bootstrap or " + + "evolving producers) get a unioned schema instead of silently dropping columns " + + "that exist only in some files. Set to false to restore the previous reader " + + "behavior (single file's schema wins)."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java index 46357ee5a35c9..ace9a0590bcb5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.checkpoint.Checkpoint; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.config.ORCDFSSourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; @@ -30,6 +31,8 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; + /** * DFS Source that reads ORC data. */ @@ -53,6 +56,7 @@ public Pair>, Checkpoint> fetchNextBatch(Option } private Dataset fromFiles(String pathStr) { - return sparkSession.read().orc(pathStr.split(",")); + boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props, ORCDFSSourceConfig.ORC_DFS_MERGE_SCHEMA); + return sparkSession.read().option("mergeSchema", mergeSchemaEnabled).orc(pathStr.split(",")); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java index 10ec52f6ef782..3ad5270ab2594 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java @@ -56,7 +56,7 @@ public Pair>, Checkpoint> fetchNextBatch(Option } private Dataset fromFiles(String pathStr) { - boolean mergeSchemaOption = getBooleanWithAltKeys(this.props, ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA); - return sparkSession.read().option("mergeSchema", mergeSchemaOption).parquet(pathStr.split(",")); + boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props, ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA); + return sparkSession.read().option("mergeSchema", mergeSchemaEnabled).parquet(pathStr.split(",")); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 5cb52dcbaeaab..a136bb5290bf4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -71,6 +71,7 @@ import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA; import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION; import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX; import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR; @@ -281,7 +282,7 @@ public Option> loadAsDataset(SparkSession spark, List getPropVal(TypedProperties props, ConfigPropertySpark's native Parquet reader honors {@code mergeSchema} on all supported versions. Spark's native ORC + * reader honors it on Spark 3.0+ (the native ORC impl is the default since Spark 2.4); on older runtimes the + * option is silently ignored, which is harmless. + */ + private DataFrameReader applyMergeSchemaOption(DataFrameReader reader, String fileFormat) { + if (!isParquetOrOrcFileFormat(fileFormat)) { + return reader; + } + if (!getBooleanWithAltKeys(properties, CLOUD_INCREMENTAL_MERGE_SCHEMA)) { + return reader; + } + return reader.option("mergeSchema", "true"); + } + + // Package-private for unit testing — see TestCloudObjectsSelectorCommon. + static boolean isParquetOrOrcFileFormat(String fileFormat) { + if (fileFormat == null) { + return false; + } + String f = fileFormat.trim(); + return "parquet".equalsIgnoreCase(f) || "orc".equalsIgnoreCase(f); + } + public enum Type { S3, GCS diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java index 89d522675ba16..aa15ca18ec6d3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java @@ -20,14 +20,28 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; + /** * Basic tests for {@link TestAvroDFSSource}. */ @@ -54,4 +68,93 @@ protected Source prepareDFSSource(TypedProperties props) { protected void writeNewDataToFile(List records, Path path) throws IOException { Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path); } + + /** + * Regression test: when a single batch contains files with additive schema evolution + * (one file has the base schema, another has the same fields plus an extra field with a + * default), reading via {@link AvroDFSSource} configured with the wider reader schema must + * (a) not fail, (b) return all records from both files, and (c) materialize the wider field + * as the default for records from the narrow file and as the written value for records from + * the wider file. Locks in Avro reader/writer schema-resolution behavior. + */ + @Test + public void testAdditiveSchemaEvolutionAcrossFiles() throws Exception { + // Use a unique subdirectory because basePath is static and shared with + // the parent testReadingFromSource, which writes 10000+ records into dfsRoot + // and would otherwise pollute this test's read. + String additiveRoot = basePath + "/avroFilesAdditive"; + fs.mkdirs(new Path(additiveRoot)); + + Schema narrowSchema = HoodieTestDataGenerator.AVRO_SCHEMA; + Schema widerSchema = addStringFieldWithDefault(narrowSchema, "additive_field", "DEFAULT"); + + // File A: narrow writer schema, no additive_field. + int narrowCount = 30; + List narrowRecords = Helpers.toGenericRecords( + dataGenerator.generateInserts("000", narrowCount)); + Path pathA = new Path(additiveRoot, "narrow" + fileSuffix); + Helpers.saveAvroToDFS(narrowRecords, pathA, narrowSchema); + + // File B: wider writer schema, additive_field set to a known value. + int wideCount = 20; + List wideRecords = new ArrayList<>(); + for (GenericRecord narrow : Helpers.toGenericRecords( + dataGenerator.generateInserts("001", wideCount))) { + GenericRecord wide = new GenericData.Record(widerSchema); + for (Schema.Field f : narrowSchema.getFields()) { + wide.put(f.name(), narrow.get(f.name())); + } + wide.put("additive_field", "WRITTEN"); + wideRecords.add(wide); + } + Path pathB = new Path(additiveRoot, "wider" + fileSuffix); + Helpers.saveAvroToDFS(wideRecords, pathB, widerSchema); + + // Write the wider schema to DFS and point a fresh schema provider at it so the source's + // reader schema is wider than file A's writer schema. + Path widerSchemaFile = new Path(basePath + "/wider-source.avsc"); + try (OutputStream out = fs.create(widerSchemaFile)) { + out.write(widerSchema.toString(true).getBytes(StandardCharsets.UTF_8)); + } + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.streamer.source.dfs.root", additiveRoot); + props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", widerSchemaFile.toString()); + FilebasedSchemaProvider widerProvider = new FilebasedSchemaProvider(props, jsc); + AvroDFSSource source = new AvroDFSSource(props, jsc, sparkSession, widerProvider); + + SourceFormatAdapter adapter = new SourceFormatAdapter(source); + JavaRDD fetched = + adapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get(); + List read = fetched.collect(); + + assertEquals(narrowCount + wideCount, read.size(), + "Both narrow and wider files should be read in the same batch"); + + long defaulted = read.stream() + .filter(r -> "DEFAULT".equals(String.valueOf(r.get("additive_field")))) + .count(); + long preserved = read.stream() + .filter(r -> "WRITTEN".equals(String.valueOf(r.get("additive_field")))) + .count(); + assertEquals(narrowCount, defaulted, + "Records from the narrow file should get the wider reader schema's default for additive_field"); + assertEquals(wideCount, preserved, + "Records from the wider file should preserve the written value of additive_field"); + } + + /** + * Returns a copy of {@code base} with one extra optional string field appended, defaulting + * to {@code defaultValue}. The new field has a non-null default so Avro's schema-resolution + * can fill it in for records read with this schema but written under {@code base}. + */ + private static Schema addStringFieldWithDefault(Schema base, String fieldName, String defaultValue) { + List fields = new ArrayList<>(); + for (Schema.Field f : base.getFields()) { + fields.add(new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())); + } + fields.add(new Schema.Field(fieldName, Schema.create(Schema.Type.STRING), null, defaultValue)); + Schema wider = Schema.createRecord(base.getName(), base.getDoc(), base.getNamespace(), false); + wider.setFields(fields); + return wider; + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index d25f3082d0822..e4d1e42cabef0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -28,18 +28,27 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.FileInputStream; import java.io.IOException; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness { @@ -200,6 +209,59 @@ void loadDatasetWithNestedSchemaAndCoalesceAliases() throws IOException { Assertions.assertEquals(expectedSchema, result.get().schema(), "output dataset schema should match source schema"); } + @Test + void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) { + String p1 = tempDir.resolve("part1").toString(); + String p2 = tempDir.resolve("part2").toString(); + + StructType schema1 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("b", DataTypes.StringType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) + .write().parquet(p1); + + StructType schema2 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("c", DataTypes.IntegerType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 99)), schema2) + .write().parquet(p2); + + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(new TypedProperties()); + List input = Arrays.asList( + new CloudObjectMetadata(p1, 1L), + new CloudObjectMetadata(p2, 1L)); + Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1); + assertTrue(result.isPresent()); + Dataset ds = result.get(); + Assertions.assertEquals(2, ds.count()); + Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); + assertTrue(colNames.contains("b")); + assertTrue(colNames.contains("c")); + } + + /** + * Verifies that the format-gating predicate for the cloud-incremental mergeSchema option recognises + * Parquet and ORC and rejects everything else. End-to-end ORC ingestion is not exercised here because + * {@code hudi-utilities} pulls in {@code orc-core-nohive} while Spark 3.x's ORC writer expects the + * regular {@code orc-core}; that classpath conflict makes {@code sparkSession.write().orc(...)} fail + * with {@code NoSuchFieldError: type} in this module's tests. The end-to-end behaviour for ORC is + * covered by Parquet's tests via the shared helper, plus this predicate test for the format dispatch. + */ + @Test + void isParquetOrOrcFileFormatRecognisesBothFormats() { + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("parquet")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("PARQUET")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("orc")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("ORC")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" parquet ")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" orc ")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("json")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("csv")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("avro")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(null)); + } + @Test public void partitionKeyNotPresentInPath() { List input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1));