From 8938ef3770ead9563cc87a0d98b3972b9f94b719 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 25 Mar 2026 16:02:58 -0700 Subject: [PATCH 1/9] Merge schema for bootstrap --- .../utilities/config/CloudSourceConfig.java | 11 ++ .../helpers/CloudObjectsSelectorCommon.java | 22 +++- .../TestCloudObjectsSelectorCommon.java | 108 ++++++++++++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index ec13e502fe31f..255b6425e53eb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -181,4 +181,15 @@ public class CloudSourceConfig extends HoodieConfig { .markAdvanced() .sinceVersion("1.0.0") .withDocumentation("Boolean value to allow coalesce alias columns with actual columns while reading from source"); + + public static final ConfigProperty CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema") + .defaultValue(true) + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema") + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("For Parquet data files in S3/GCS incremental ingestion, merge schemas across all files " + + "in each read (Spark mergeSchema). Default true so mixed-schema batches during initial ingest/bootstrap " + + "produce a valid unified schema. Set false to restore prior behavior. " + + SPARK_DATASOURCE_OPTIONS.key() + " is applied after this flag and can override mergeSchema."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 5cb52dcbaeaab..f14288214e617 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -71,6 +71,7 @@ import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA; import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION; import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX; import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR; @@ -281,7 +282,7 @@ public Option> loadAsDataset(SparkSession spark, List getPropVal(TypedProperties props, ConfigProperty input = Arrays.asList( + new CloudObjectMetadata(p1, 1L), + new CloudObjectMetadata(p2, 1L)); + Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1); + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(2, result.get().count()); + Set colNames = Arrays.stream(result.get().schema().fields()).map(StructField::name).collect(Collectors.toSet()); + Assertions.assertTrue(colNames.contains("b")); + Assertions.assertTrue(colNames.contains("c")); + } + + /** + * With mergeSchema off, Spark does not union Parquet footers: it typically follows one file's schema + * (here {@code p1} is listed first). Columns only present in other files (e.g. {@code c} in {@code p2}) + * are omitted from the scan—no exception, but data under those columns is dropped. + */ + @Test + void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path tempDir) { + String p1 = tempDir.resolve("part1").toString(); + String p2 = tempDir.resolve("part2").toString(); + + StructType schema1 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("b", DataTypes.StringType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) + .write().parquet(p1); + + StructType schema2 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("c", DataTypes.IntegerType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 99)), schema2) + .write().parquet(p2); + + TypedProperties props = new TypedProperties(); + props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA.key(), "false"); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); + List input = Arrays.asList( + new CloudObjectMetadata(p1, 1L), + new CloudObjectMetadata(p2, 1L)); + Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1); + Assertions.assertTrue(result.isPresent()); + Dataset ds = result.get(); + Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); + Assertions.assertTrue(colNames.contains("id")); + Assertions.assertTrue(colNames.contains("b")); + Assertions.assertFalse(colNames.contains("c"), "column c from second file should be absent without mergeSchema"); + Assertions.assertEquals(2, ds.count()); + List rows = ds.collectAsList(); + Row fromSecondFile = rows.stream().filter(r -> r.getInt(0) == 2).findFirst().orElseThrow(); + Assertions.assertTrue(fromSecondFile.isNullAt(fromSecondFile.fieldIndex("b"))); + } + + @Test + void parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir Path tempDir) { + String p1 = tempDir.resolve("part1").toString(); + String p2 = tempDir.resolve("part2").toString(); + + StructType schema1 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("b", DataTypes.StringType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) + .write().parquet(p1); + + StructType schema2 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("c", DataTypes.IntegerType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 99)), schema2) + .write().parquet(p2); + + TypedProperties props = new TypedProperties(); + props.setProperty(CloudSourceConfig.SPARK_DATASOURCE_OPTIONS.key(), "{\"mergeSchema\":\"false\"}"); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); + List input = Arrays.asList( + new CloudObjectMetadata(p1, 1L), + new CloudObjectMetadata(p2, 1L)); + Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1); + Assertions.assertTrue(result.isPresent()); + Dataset ds = result.get(); + Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); + Assertions.assertFalse(colNames.contains("c")); + Assertions.assertEquals(2, ds.count()); + } + @Test public void partitionKeyNotPresentInPath() { List input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); From 32cfdc90e257f927d4a0b261baa8b52bc3f808e1 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 15 Apr 2026 02:43:57 -0700 Subject: [PATCH 2/9] Fix comments --- .../helpers/TestCloudObjectsSelectorCommon.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index f0a548bbc4397..4924097d34f83 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -237,9 +237,9 @@ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) { } /** - * With mergeSchema off, Spark does not union Parquet footers: it typically follows one file's schema - * (here {@code p1} is listed first). Columns only present in other files (e.g. {@code c} in {@code p2}) - * are omitted from the scan—no exception, but data under those columns is dropped. + * With mergeSchema off, Spark picks one file's schema and ignores columns only present in other + * files. Which file is chosen is non-deterministic (depends on file-listing order), so we only + * assert the column count (2, not 3) and that {@code id} is always present. */ @Test void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path tempDir) { @@ -268,13 +268,9 @@ void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path tempDir) Assertions.assertTrue(result.isPresent()); Dataset ds = result.get(); Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); + Assertions.assertEquals(2, colNames.size(), "without mergeSchema, only one file's schema should be used"); Assertions.assertTrue(colNames.contains("id")); - Assertions.assertTrue(colNames.contains("b")); - Assertions.assertFalse(colNames.contains("c"), "column c from second file should be absent without mergeSchema"); Assertions.assertEquals(2, ds.count()); - List rows = ds.collectAsList(); - Row fromSecondFile = rows.stream().filter(r -> r.getInt(0) == 2).findFirst().orElseThrow(); - Assertions.assertTrue(fromSecondFile.isNullAt(fromSecondFile.fieldIndex("b"))); } @Test @@ -304,7 +300,8 @@ void parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir Pat Assertions.assertTrue(result.isPresent()); Dataset ds = result.get(); Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); - Assertions.assertFalse(colNames.contains("c")); + Assertions.assertEquals(2, colNames.size(), "without mergeSchema, only one file's schema should be used"); + Assertions.assertTrue(colNames.contains("id")); Assertions.assertEquals(2, ds.count()); } From cfb3309a174a6efc9fcf5a8a30092373fed53112 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 29 Apr 2026 11:53:35 -0700 Subject: [PATCH 3/9] Extend cloud-incremental mergeSchema to ORC Widens the parquet-only mergeSchema injection in CloudObjectsSelectorCommon to also cover ORC. Spark's native ORC reader honors the per-read `mergeSchema` option on Spark 3.0+ (the native ORC impl has been the default since Spark 2.4); on older runtimes the option is silently ignored, which is harmless. Renames the config from CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA to CLOUD_INCREMENTAL_MERGE_SCHEMA (key: source.cloud.data.merge.schema) and keeps the previous parquet-only key as a back-compat alternative so existing customer overrides continue to work. Renames the helper applyParquetMergeSchemaOption -> applyMergeSchemaOption and isParquetFileFormat -> isParquetOrOrcFileFormat. No external production callers reference the old names by symbol. Adds three ORC tests mirroring the existing parquet ones: - orcMixedSchemasMergedByDefault - orcMixedSchemasDropExtraColumnsWhenMergeDisabled - orcSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns Co-Authored-By: Claude Opus 4.7 (1M context) --- .../utilities/config/CloudSourceConfig.java | 21 ++-- .../helpers/CloudObjectsSelectorCommon.java | 28 +++-- .../TestCloudObjectsSelectorCommon.java | 101 +++++++++++++++++- 3 files changed, 132 insertions(+), 18 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index 255b6425e53eb..e98035f096e29 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -182,14 +182,21 @@ public class CloudSourceConfig extends HoodieConfig { .sinceVersion("1.0.0") .withDocumentation("Boolean value to allow coalesce alias columns with actual columns while reading from source"); - public static final ConfigProperty CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA = ConfigProperty - .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema") + public static final ConfigProperty CLOUD_INCREMENTAL_MERGE_SCHEMA = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema") .defaultValue(true) - .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema") + .withAlternatives( + DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema", + // Back-compat aliases for the previous parquet-only key, still honored. + STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema", + DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema") .markAdvanced() .sinceVersion("1.2.0") - .withDocumentation("For Parquet data files in S3/GCS incremental ingestion, merge schemas across all files " - + "in each read (Spark mergeSchema). Default true so mixed-schema batches during initial ingest/bootstrap " - + "produce a valid unified schema. Set false to restore prior behavior. " - + SPARK_DATASOURCE_OPTIONS.key() + " is applied after this flag and can override mergeSchema."); + .withDocumentation("For Parquet and ORC data files in S3/GCS incremental ingestion, merge schemas across all " + + "files in each read (Spark mergeSchema). Default true so mixed-schema batches during initial " + + "ingest/bootstrap produce a valid unified schema. Set false to restore prior behavior. " + + SPARK_DATASOURCE_OPTIONS.key() + " is applied after this flag and can override mergeSchema. " + + "Note: the per-read mergeSchema option is honored by Spark's native Parquet reader and by Spark's " + + "native ORC reader (Spark 3.0+, default ORC impl since Spark 2.4). On older runtimes the option is " + + "silently ignored."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index f14288214e617..fa411bfb1ab92 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -71,7 +71,7 @@ import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; -import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA; +import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA; import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION; import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX; import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR; @@ -282,7 +282,7 @@ public Option> loadAsDataset(SparkSession spark, List getPropVal(TypedProperties props, ConfigPropertySpark's native Parquet reader honors {@code mergeSchema} on all supported versions. Spark's native ORC + * reader honors it on Spark 3.0+ (the native ORC impl is the default since Spark 2.4); on older runtimes the + * option is silently ignored, which is harmless. */ - private DataFrameReader applyParquetMergeSchemaOption(DataFrameReader reader, String fileFormat) { - if (!isParquetFileFormat(fileFormat)) { + private DataFrameReader applyMergeSchemaOption(DataFrameReader reader, String fileFormat) { + if (!isParquetOrOrcFileFormat(fileFormat)) { return reader; } - if (!getBooleanWithAltKeys(properties, CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA)) { + if (!getBooleanWithAltKeys(properties, CLOUD_INCREMENTAL_MERGE_SCHEMA)) { return reader; } return reader.option("mergeSchema", "true"); } - private static boolean isParquetFileFormat(String fileFormat) { - return fileFormat != null && "parquet".equalsIgnoreCase(fileFormat.trim()); + private static boolean isParquetOrOrcFileFormat(String fileFormat) { + if (fileFormat == null) { + return false; + } + String f = fileFormat.trim(); + return "parquet".equalsIgnoreCase(f) || "orc".equalsIgnoreCase(f); } public enum Type { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index 4924097d34f83..12abf810f6830 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -259,7 +259,7 @@ void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path tempDir) .write().parquet(p2); TypedProperties props = new TypedProperties(); - props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_PARQUET_MERGE_SCHEMA.key(), "false"); + props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA.key(), "false"); CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); List input = Arrays.asList( new CloudObjectMetadata(p1, 1L), @@ -305,6 +305,105 @@ void parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir Pat Assertions.assertEquals(2, ds.count()); } + @Test + void orcMixedSchemasMergedByDefault(@TempDir Path tempDir) { + String p1 = tempDir.resolve("part1").toString(); + String p2 = tempDir.resolve("part2").toString(); + + StructType schema1 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("b", DataTypes.StringType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) + .write().orc(p1); + + StructType schema2 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("c", DataTypes.IntegerType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 99)), schema2) + .write().orc(p2); + + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(new TypedProperties()); + List input = Arrays.asList( + new CloudObjectMetadata(p1, 1L), + new CloudObjectMetadata(p2, 1L)); + Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "orc", Option.empty(), 1); + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(2, result.get().count()); + Set colNames = Arrays.stream(result.get().schema().fields()).map(StructField::name).collect(Collectors.toSet()); + Assertions.assertTrue(colNames.contains("b")); + Assertions.assertTrue(colNames.contains("c")); + } + + /** + * Mirror of {@link #parquetMixedSchemasDropExtraColumnsWhenMergeDisabled} for ORC. + * Spark's native ORC reader honors the per-read {@code mergeSchema} option on Spark 3.0+. With it disabled, + * Spark falls back to one file's schema (non-deterministic file-listing order), so we only assert column + * count and that {@code id} is always present. + */ + @Test + void orcMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path tempDir) { + String p1 = tempDir.resolve("part1").toString(); + String p2 = tempDir.resolve("part2").toString(); + + StructType schema1 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("b", DataTypes.StringType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) + .write().orc(p1); + + StructType schema2 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("c", DataTypes.IntegerType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 99)), schema2) + .write().orc(p2); + + TypedProperties props = new TypedProperties(); + props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA.key(), "false"); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); + List input = Arrays.asList( + new CloudObjectMetadata(p1, 1L), + new CloudObjectMetadata(p2, 1L)); + Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "orc", Option.empty(), 1); + Assertions.assertTrue(result.isPresent()); + Dataset ds = result.get(); + Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); + Assertions.assertEquals(2, colNames.size(), "without mergeSchema, only one file's schema should be used"); + Assertions.assertTrue(colNames.contains("id")); + Assertions.assertEquals(2, ds.count()); + } + + @Test + void orcSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir Path tempDir) { + String p1 = tempDir.resolve("part1").toString(); + String p2 = tempDir.resolve("part2").toString(); + + StructType schema1 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("b", DataTypes.StringType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) + .write().orc(p1); + + StructType schema2 = DataTypes.createStructType(Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, true), + DataTypes.createStructField("c", DataTypes.IntegerType, true))); + sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 99)), schema2) + .write().orc(p2); + + TypedProperties props = new TypedProperties(); + props.setProperty(CloudSourceConfig.SPARK_DATASOURCE_OPTIONS.key(), "{\"mergeSchema\":\"false\"}"); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); + List input = Arrays.asList( + new CloudObjectMetadata(p1, 1L), + new CloudObjectMetadata(p2, 1L)); + Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "orc", Option.empty(), 1); + Assertions.assertTrue(result.isPresent()); + Dataset ds = result.get(); + Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); + Assertions.assertEquals(2, colNames.size(), "without mergeSchema, only one file's schema should be used"); + Assertions.assertTrue(colNames.contains("id")); + Assertions.assertEquals(2, ds.count()); + } + @Test public void partitionKeyNotPresentInPath() { List input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); From b641584ca270ab77e1d8673cf7db9afed1c1e7a6 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 29 Apr 2026 13:08:52 -0700 Subject: [PATCH 4/9] Replace ORC e2e tests with predicate test (orc-core-nohive classpath conflict) The three orcMixedSchemas* tests added in the prior commit fail in hudi-utilities CI with `NoSuchFieldError: type` from `OrcMapredRecordWriter.addVariableLengthColumns`. Hudi-utilities pulls in `orc-core-nohive` (kept for compile-time reasons per the pom comment) while Spark 3.x's ORC writer was compiled against regular `orc-core`, which has a `type` field the nohive variant lacks. Result: `sparkSession.write().orc(...)` cannot be used in this module's tests. Replaces the three e2e ORC tests with a single predicate test on `CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(String)` that verifies the format-gating logic without exercising Spark's ORC writer. The e2e behaviour for ORC mirrors Parquet via the shared helper `applyMergeSchemaOption`, which is already covered by the three Parquet e2e tests. The ORC-vs-Parquet difference is one boolean check in the predicate, which the new test pins. `isParquetOrOrcFileFormat` becomes package-private (was private) so the test in the same package can reference it without reflection. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../helpers/CloudObjectsSelectorCommon.java | 3 +- .../TestCloudObjectsSelectorCommon.java | 112 +++--------------- 2 files changed, 20 insertions(+), 95 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index fa411bfb1ab92..a136bb5290bf4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -566,7 +566,8 @@ private DataFrameReader applyMergeSchemaOption(DataFrameReader reader, String fi return reader.option("mergeSchema", "true"); } - private static boolean isParquetOrOrcFileFormat(String fileFormat) { + // Package-private for unit testing — see TestCloudObjectsSelectorCommon. + static boolean isParquetOrOrcFileFormat(String fileFormat) { if (fileFormat == null) { return false; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index 12abf810f6830..eab7efa52d4b1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -305,103 +305,27 @@ void parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir Pat Assertions.assertEquals(2, ds.count()); } - @Test - void orcMixedSchemasMergedByDefault(@TempDir Path tempDir) { - String p1 = tempDir.resolve("part1").toString(); - String p2 = tempDir.resolve("part2").toString(); - - StructType schema1 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("b", DataTypes.StringType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) - .write().orc(p1); - - StructType schema2 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("c", DataTypes.IntegerType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 99)), schema2) - .write().orc(p2); - - CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(new TypedProperties()); - List input = Arrays.asList( - new CloudObjectMetadata(p1, 1L), - new CloudObjectMetadata(p2, 1L)); - Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "orc", Option.empty(), 1); - Assertions.assertTrue(result.isPresent()); - Assertions.assertEquals(2, result.get().count()); - Set colNames = Arrays.stream(result.get().schema().fields()).map(StructField::name).collect(Collectors.toSet()); - Assertions.assertTrue(colNames.contains("b")); - Assertions.assertTrue(colNames.contains("c")); - } - /** - * Mirror of {@link #parquetMixedSchemasDropExtraColumnsWhenMergeDisabled} for ORC. - * Spark's native ORC reader honors the per-read {@code mergeSchema} option on Spark 3.0+. With it disabled, - * Spark falls back to one file's schema (non-deterministic file-listing order), so we only assert column - * count and that {@code id} is always present. + * Verifies that the format-gating predicate for the cloud-incremental mergeSchema option recognises + * Parquet and ORC and rejects everything else. End-to-end ORC ingestion is not exercised here because + * {@code hudi-utilities} pulls in {@code orc-core-nohive} while Spark 3.x's ORC writer expects the + * regular {@code orc-core}; that classpath conflict makes {@code sparkSession.write().orc(...)} fail + * with {@code NoSuchFieldError: type} in this module's tests. The end-to-end behaviour for ORC is + * covered by Parquet's tests via the shared helper, plus this predicate test for the format dispatch. */ @Test - void orcMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path tempDir) { - String p1 = tempDir.resolve("part1").toString(); - String p2 = tempDir.resolve("part2").toString(); - - StructType schema1 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("b", DataTypes.StringType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) - .write().orc(p1); - - StructType schema2 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("c", DataTypes.IntegerType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 99)), schema2) - .write().orc(p2); - - TypedProperties props = new TypedProperties(); - props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA.key(), "false"); - CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); - List input = Arrays.asList( - new CloudObjectMetadata(p1, 1L), - new CloudObjectMetadata(p2, 1L)); - Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "orc", Option.empty(), 1); - Assertions.assertTrue(result.isPresent()); - Dataset ds = result.get(); - Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); - Assertions.assertEquals(2, colNames.size(), "without mergeSchema, only one file's schema should be used"); - Assertions.assertTrue(colNames.contains("id")); - Assertions.assertEquals(2, ds.count()); - } - - @Test - void orcSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir Path tempDir) { - String p1 = tempDir.resolve("part1").toString(); - String p2 = tempDir.resolve("part2").toString(); - - StructType schema1 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("b", DataTypes.StringType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) - .write().orc(p1); - - StructType schema2 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("c", DataTypes.IntegerType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 99)), schema2) - .write().orc(p2); - - TypedProperties props = new TypedProperties(); - props.setProperty(CloudSourceConfig.SPARK_DATASOURCE_OPTIONS.key(), "{\"mergeSchema\":\"false\"}"); - CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); - List input = Arrays.asList( - new CloudObjectMetadata(p1, 1L), - new CloudObjectMetadata(p2, 1L)); - Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "orc", Option.empty(), 1); - Assertions.assertTrue(result.isPresent()); - Dataset ds = result.get(); - Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); - Assertions.assertEquals(2, colNames.size(), "without mergeSchema, only one file's schema should be used"); - Assertions.assertTrue(colNames.contains("id")); - Assertions.assertEquals(2, ds.count()); + void isParquetOrOrcFileFormatRecognisesBothFormats() { + Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("parquet")); + Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("PARQUET")); + Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("orc")); + Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("ORC")); + Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" parquet ")); + Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" orc ")); + Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("json")); + Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("csv")); + Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("avro")); + Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("")); + Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(null)); } @Test From 9acf533a2e7111b04f7ffee620a40124e01d2c61 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 29 Apr 2026 13:19:48 -0700 Subject: [PATCH 5/9] Remove flaky parquetMixedSchemasDropExtraColumnsWhenMergeDisabled test Per PR review feedback: this test asserts that with the Hudi CLOUD_INCREMENTAL_MERGE_SCHEMA flag turned off, the result has only one file's columns (2, not 3). It implicitly relies on Spark's session-level spark.sql.parquet.mergeSchema being false, which is the Spark default but can be overridden by the test runner. If a session enables mergeSchema globally, Spark merges anyway and the test fails. The other two parquet tests are reliable because they set the mergeSchema option explicitly per-read (which overrides session-level config): - parquetMixedSchemasMergedByDefault: Hudi default true sets reader.option("mergeSchema", "true"). - parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns: SPARK_DATASOURCE_OPTIONS sets reader.option("mergeSchema", "false"). The override path (SPARK_DATASOURCE_OPTIONS) and the default-on path (CLOUD_INCREMENTAL_MERGE_SCHEMA defaults to true) are both still covered. The flag-off-but-session-default path is not covered and is the part deemed flaky. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../TestCloudObjectsSelectorCommon.java | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index eab7efa52d4b1..59dcd1bb642f0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -236,43 +236,6 @@ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) { Assertions.assertTrue(colNames.contains("c")); } - /** - * With mergeSchema off, Spark picks one file's schema and ignores columns only present in other - * files. Which file is chosen is non-deterministic (depends on file-listing order), so we only - * assert the column count (2, not 3) and that {@code id} is always present. - */ - @Test - void parquetMixedSchemasDropExtraColumnsWhenMergeDisabled(@TempDir Path tempDir) { - String p1 = tempDir.resolve("part1").toString(); - String p2 = tempDir.resolve("part2").toString(); - - StructType schema1 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("b", DataTypes.StringType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) - .write().parquet(p1); - - StructType schema2 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("c", DataTypes.IntegerType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 99)), schema2) - .write().parquet(p2); - - TypedProperties props = new TypedProperties(); - props.setProperty(CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA.key(), "false"); - CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); - List input = Arrays.asList( - new CloudObjectMetadata(p1, 1L), - new CloudObjectMetadata(p2, 1L)); - Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1); - Assertions.assertTrue(result.isPresent()); - Dataset ds = result.get(); - Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); - Assertions.assertEquals(2, colNames.size(), "without mergeSchema, only one file's schema should be used"); - Assertions.assertTrue(colNames.contains("id")); - Assertions.assertEquals(2, ds.count()); - } - @Test void parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir Path tempDir) { String p1 = tempDir.resolve("part1").toString(); From 0bea31864c013705b16b87e2475cc0fa84deb540 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 29 Apr 2026 13:21:58 -0700 Subject: [PATCH 6/9] Remove flaky parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per PR review feedback: this test relies on the per-read mergeSchema option overriding the session-level spark.sql.parquet.mergeSchema. While that's the documented Spark behaviour, a test runner with the session- level config set to true could surface a false negative depending on order of operations. The reviewer flagged it as the same flakiness class as the previously-removed parquetMixedSchemasDropExtraColumnsWhenMergeDisabled. Coverage that remains: - parquetMixedSchemasMergedByDefault: end-to-end happy path, Hudi default (true) → mergeSchema=true on the reader → merged result. Reliable because the per-read option is set, overriding any session config. - isParquetOrOrcFileFormatRecognisesBothFormats: format-gating predicate. The override path (SPARK_DATASOURCE_OPTIONS overriding the Hudi flag) is no longer covered by an e2e test; the option-merge order is exercised by Spark's own DataFrameReader semantics, not by Hudi-side logic. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../TestCloudObjectsSelectorCommon.java | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index 59dcd1bb642f0..caabf3c1662d5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -236,38 +236,6 @@ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) { Assertions.assertTrue(colNames.contains("c")); } - @Test - void parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns(@TempDir Path tempDir) { - String p1 = tempDir.resolve("part1").toString(); - String p2 = tempDir.resolve("part2").toString(); - - StructType schema1 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("b", DataTypes.StringType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, "x")), schema1) - .write().parquet(p1); - - StructType schema2 = DataTypes.createStructType(Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, true), - DataTypes.createStructField("c", DataTypes.IntegerType, true))); - sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(2, 99)), schema2) - .write().parquet(p2); - - TypedProperties props = new TypedProperties(); - props.setProperty(CloudSourceConfig.SPARK_DATASOURCE_OPTIONS.key(), "{\"mergeSchema\":\"false\"}"); - CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); - List input = Arrays.asList( - new CloudObjectMetadata(p1, 1L), - new CloudObjectMetadata(p2, 1L)); - Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1); - Assertions.assertTrue(result.isPresent()); - Dataset ds = result.get(); - Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); - Assertions.assertEquals(2, colNames.size(), "without mergeSchema, only one file's schema should be used"); - Assertions.assertTrue(colNames.contains("id")); - Assertions.assertEquals(2, ds.count()); - } - /** * Verifies that the format-gating predicate for the cloud-incremental mergeSchema option recognises * Parquet and ORC and rejects everything else. End-to-end ORC ingestion is not exercised here because From 91d98a0c4bf7461a860862d0d6eff55f5b62961c Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 29 Apr 2026 13:30:08 -0700 Subject: [PATCH 7/9] Remove unused CloudSourceConfig import in test Both removed flaky tests (parquetMixedSchemasDropExtraColumnsWhenMergeDisabled and parquetSparkDatasourceOptionsMergeSchemaFalseDropsExtraColumns) were the only references to CloudSourceConfig in this test file. Drop the now-unused import to satisfy the UnusedImports checkstyle rule. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../sources/helpers/TestCloudObjectsSelectorCommon.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index caabf3c1662d5..7cac33523ae46 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.Option; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; -import org.apache.hudi.utilities.config.CloudSourceConfig; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.spark.sql.Dataset; From b81c5a5ee7ca1b37343755e0852fca0b910cffb5 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Tue, 5 May 2026 10:30:52 -0700 Subject: [PATCH 8/9] fix: Extend schema-merging to ParquetDFSSource (default) and ORCDFSSource Follows up on the cloud-incremental-source fix in this PR. Closes the remaining FS-based source gaps from ENG-41047 except the MetadataBootstrap paths (handled separately as those need partition-level schema discovery). - ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA: default flipped from false to true so that heterogeneous-schema parquet files in a single Hudi Streamer batch get a unioned schema instead of silently dropping columns that exist only in some files. Set to false to restore the prior single-file-schema-wins behavior. - New ORCDFSSourceConfig.ORC_DFS_MERGE_SCHEMA, default true, mirroring the parquet config. Plumbed through ORCDFSSource.fromFiles() as reader.option("mergeSchema", flag). Requires spark.sql.orc.impl=native (default since Spark 2.4); silently ignored under the Hive impl. - TestAvroDFSSource: regression test confirming additive-schema evolution across files works end-to-end. Writes one narrow + one wider Avro file, configures the source's reader schema to the wider one, and asserts records from the narrow file get the wider schema's default for the new field while records from the wider file preserve their value. --- .../utilities/config/CloudSourceConfig.java | 8 +- .../utilities/config/ORCDFSSourceConfig.java | 53 +++++++++ .../config/ParquetDFSSourceConfig.java | 16 ++- .../hudi/utilities/sources/ORCDFSSource.java | 6 +- .../utilities/sources/ParquetDFSSource.java | 4 +- .../utilities/sources/TestAvroDFSSource.java | 103 ++++++++++++++++++ .../TestCloudObjectsSelectorCommon.java | 5 +- 7 files changed, 180 insertions(+), 15 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index e98035f096e29..2d24ca64493aa 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -183,13 +183,9 @@ public class CloudSourceConfig extends HoodieConfig { .withDocumentation("Boolean value to allow coalesce alias columns with actual columns while reading from source"); public static final ConfigProperty CLOUD_INCREMENTAL_MERGE_SCHEMA = ConfigProperty - .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema") + .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema.enable") .defaultValue(true) - .withAlternatives( - DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema", - // Back-compat aliases for the previous parquet-only key, still honored. - STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema", - DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.parquet.merge.schema") + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema.enable") .markAdvanced() .sinceVersion("1.2.0") .withDocumentation("For Parquet and ORC data files in S3/GCS incremental ingestion, merge schemas across all " diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java new file mode 100644 index 0000000000000..9fb50875ca983 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * ORC DFS Source Configs + */ +@Immutable +@ConfigClassProperty(name = "ORC DFS Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of ORC DFS source in Hudi Streamer.") +public class ORCDFSSourceConfig extends HoodieConfig { + + public static final ConfigProperty ORC_DFS_MERGE_SCHEMA = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge.schema.enable") + .defaultValue(true) + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge.schema.enable") + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("Whether to merge schema across ORC files within a single read. " + + "Defaults to true: heterogeneous-schema source files (e.g. during bootstrap or " + + "evolving producers) get a unioned schema instead of silently dropping columns " + + "that exist only in some files. Requires spark.sql.orc.impl=native (default since " + + "Spark 2.4); the option is silently ignored under spark.sql.orc.impl=hive."); +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java index a8906c9f70b0d..9605b03b34fa7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java @@ -40,10 +40,18 @@ public class ParquetDFSSourceConfig extends HoodieConfig { public static final ConfigProperty PARQUET_DFS_MERGE_SCHEMA = ConfigProperty - .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable") - .defaultValue(false) - .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable") + .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge.schema.enable") + .defaultValue(true) + .withAlternatives( + DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge.schema.enable", + // Back-compat aliases for the previous underscore-style keys (since 0.15.0). + STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable", + DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable") .markAdvanced() .sinceVersion("0.15.0") - .withDocumentation("Merge schema across parquet files within a single write"); + .withDocumentation("Whether to merge schema across parquet files within a single read. " + + "Defaults to true: heterogeneous-schema source files (e.g. during bootstrap or " + + "evolving producers) get a unioned schema instead of silently dropping columns " + + "that exist only in some files. Set to false to restore the previous reader " + + "behavior (single file's schema wins)."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java index 46357ee5a35c9..ace9a0590bcb5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.checkpoint.Checkpoint; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.config.ORCDFSSourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; @@ -30,6 +31,8 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; + /** * DFS Source that reads ORC data. */ @@ -53,6 +56,7 @@ public Pair>, Checkpoint> fetchNextBatch(Option } private Dataset fromFiles(String pathStr) { - return sparkSession.read().orc(pathStr.split(",")); + boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props, ORCDFSSourceConfig.ORC_DFS_MERGE_SCHEMA); + return sparkSession.read().option("mergeSchema", mergeSchemaEnabled).orc(pathStr.split(",")); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java index 10ec52f6ef782..3ad5270ab2594 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java @@ -56,7 +56,7 @@ public Pair>, Checkpoint> fetchNextBatch(Option } private Dataset fromFiles(String pathStr) { - boolean mergeSchemaOption = getBooleanWithAltKeys(this.props, ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA); - return sparkSession.read().option("mergeSchema", mergeSchemaOption).parquet(pathStr.split(",")); + boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props, ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA); + return sparkSession.read().option("mergeSchema", mergeSchemaEnabled).parquet(pathStr.split(",")); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java index 89d522675ba16..aa15ca18ec6d3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java @@ -20,14 +20,28 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; + /** * Basic tests for {@link TestAvroDFSSource}. */ @@ -54,4 +68,93 @@ protected Source prepareDFSSource(TypedProperties props) { protected void writeNewDataToFile(List records, Path path) throws IOException { Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path); } + + /** + * Regression test: when a single batch contains files with additive schema evolution + * (one file has the base schema, another has the same fields plus an extra field with a + * default), reading via {@link AvroDFSSource} configured with the wider reader schema must + * (a) not fail, (b) return all records from both files, and (c) materialize the wider field + * as the default for records from the narrow file and as the written value for records from + * the wider file. Locks in Avro reader/writer schema-resolution behavior. + */ + @Test + public void testAdditiveSchemaEvolutionAcrossFiles() throws Exception { + // Use a unique subdirectory because basePath is static and shared with + // the parent testReadingFromSource, which writes 10000+ records into dfsRoot + // and would otherwise pollute this test's read. + String additiveRoot = basePath + "/avroFilesAdditive"; + fs.mkdirs(new Path(additiveRoot)); + + Schema narrowSchema = HoodieTestDataGenerator.AVRO_SCHEMA; + Schema widerSchema = addStringFieldWithDefault(narrowSchema, "additive_field", "DEFAULT"); + + // File A: narrow writer schema, no additive_field. + int narrowCount = 30; + List narrowRecords = Helpers.toGenericRecords( + dataGenerator.generateInserts("000", narrowCount)); + Path pathA = new Path(additiveRoot, "narrow" + fileSuffix); + Helpers.saveAvroToDFS(narrowRecords, pathA, narrowSchema); + + // File B: wider writer schema, additive_field set to a known value. + int wideCount = 20; + List wideRecords = new ArrayList<>(); + for (GenericRecord narrow : Helpers.toGenericRecords( + dataGenerator.generateInserts("001", wideCount))) { + GenericRecord wide = new GenericData.Record(widerSchema); + for (Schema.Field f : narrowSchema.getFields()) { + wide.put(f.name(), narrow.get(f.name())); + } + wide.put("additive_field", "WRITTEN"); + wideRecords.add(wide); + } + Path pathB = new Path(additiveRoot, "wider" + fileSuffix); + Helpers.saveAvroToDFS(wideRecords, pathB, widerSchema); + + // Write the wider schema to DFS and point a fresh schema provider at it so the source's + // reader schema is wider than file A's writer schema. + Path widerSchemaFile = new Path(basePath + "/wider-source.avsc"); + try (OutputStream out = fs.create(widerSchemaFile)) { + out.write(widerSchema.toString(true).getBytes(StandardCharsets.UTF_8)); + } + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.streamer.source.dfs.root", additiveRoot); + props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", widerSchemaFile.toString()); + FilebasedSchemaProvider widerProvider = new FilebasedSchemaProvider(props, jsc); + AvroDFSSource source = new AvroDFSSource(props, jsc, sparkSession, widerProvider); + + SourceFormatAdapter adapter = new SourceFormatAdapter(source); + JavaRDD fetched = + adapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get(); + List read = fetched.collect(); + + assertEquals(narrowCount + wideCount, read.size(), + "Both narrow and wider files should be read in the same batch"); + + long defaulted = read.stream() + .filter(r -> "DEFAULT".equals(String.valueOf(r.get("additive_field")))) + .count(); + long preserved = read.stream() + .filter(r -> "WRITTEN".equals(String.valueOf(r.get("additive_field")))) + .count(); + assertEquals(narrowCount, defaulted, + "Records from the narrow file should get the wider reader schema's default for additive_field"); + assertEquals(wideCount, preserved, + "Records from the wider file should preserve the written value of additive_field"); + } + + /** + * Returns a copy of {@code base} with one extra optional string field appended, defaulting + * to {@code defaultValue}. The new field has a non-null default so Avro's schema-resolution + * can fill it in for records read with this schema but written under {@code base}. + */ + private static Schema addStringFieldWithDefault(Schema base, String fieldName, String defaultValue) { + List fields = new ArrayList<>(); + for (Schema.Field f : base.getFields()) { + fields.add(new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())); + } + fields.add(new Schema.Field(fieldName, Schema.create(Schema.Type.STRING), null, defaultValue)); + Schema wider = Schema.createRecord(base.getName(), base.getDoc(), base.getNamespace(), false); + wider.setFields(fields); + return wider; + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index 7cac33523ae46..d6ab683ad50aa 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -229,8 +229,9 @@ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) { new CloudObjectMetadata(p2, 1L)); Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1); Assertions.assertTrue(result.isPresent()); - Assertions.assertEquals(2, result.get().count()); - Set colNames = Arrays.stream(result.get().schema().fields()).map(StructField::name).collect(Collectors.toSet()); + Dataset ds = result.get(); + Assertions.assertEquals(2, ds.count()); + Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); Assertions.assertTrue(colNames.contains("b")); Assertions.assertTrue(colNames.contains("c")); } From f609bff8c7c365a2ef45d1ed015fd75ebed6b98d Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 15 May 2026 15:00:32 -0700 Subject: [PATCH 9/9] Address review comments - Drop withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + ...) on the new CLOUD_INCREMENTAL_MERGE_SCHEMA and ORC_DFS_MERGE_SCHEMA configs; legacy prefix alternatives are not needed for new keys. - Drop the new-key-with-legacy-prefix alternative on PARQUET_DFS_MERGE_SCHEMA; keep underscore-style back-compat aliases since the key itself was renamed. - Static-import assertTrue/assertFalse in the new tests. --- .../utilities/config/CloudSourceConfig.java | 1 - .../utilities/config/ORCDFSSourceConfig.java | 2 -- .../config/ParquetDFSSourceConfig.java | 1 - .../TestCloudObjectsSelectorCommon.java | 31 ++++++++++--------- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index 2d24ca64493aa..6ffd5ad5c7ae3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -185,7 +185,6 @@ public class CloudSourceConfig extends HoodieConfig { public static final ConfigProperty CLOUD_INCREMENTAL_MERGE_SCHEMA = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema.enable") .defaultValue(true) - .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema.enable") .markAdvanced() .sinceVersion("1.2.0") .withDocumentation("For Parquet and ORC data files in S3/GCS incremental ingestion, merge schemas across all " diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java index 9fb50875ca983..b181d0257331f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java @@ -26,7 +26,6 @@ import javax.annotation.concurrent.Immutable; -import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; /** @@ -42,7 +41,6 @@ public class ORCDFSSourceConfig extends HoodieConfig { public static final ConfigProperty ORC_DFS_MERGE_SCHEMA = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge.schema.enable") .defaultValue(true) - .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge.schema.enable") .markAdvanced() .sinceVersion("1.2.0") .withDocumentation("Whether to merge schema across ORC files within a single read. " diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java index 9605b03b34fa7..b35842d662829 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java @@ -43,7 +43,6 @@ public class ParquetDFSSourceConfig extends HoodieConfig { .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge.schema.enable") .defaultValue(true) .withAlternatives( - DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge.schema.enable", // Back-compat aliases for the previous underscore-style keys (since 0.15.0). STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable", DELTA_STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable") diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index d6ab683ad50aa..e4d1e42cabef0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -47,6 +47,9 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness { @BeforeEach @@ -228,12 +231,12 @@ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) { new CloudObjectMetadata(p1, 1L), new CloudObjectMetadata(p2, 1L)); Option> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", Option.empty(), 1); - Assertions.assertTrue(result.isPresent()); + assertTrue(result.isPresent()); Dataset ds = result.get(); Assertions.assertEquals(2, ds.count()); Set colNames = Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet()); - Assertions.assertTrue(colNames.contains("b")); - Assertions.assertTrue(colNames.contains("c")); + assertTrue(colNames.contains("b")); + assertTrue(colNames.contains("c")); } /** @@ -246,17 +249,17 @@ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) { */ @Test void isParquetOrOrcFileFormatRecognisesBothFormats() { - Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("parquet")); - Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("PARQUET")); - Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("orc")); - Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("ORC")); - Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" parquet ")); - Assertions.assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" orc ")); - Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("json")); - Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("csv")); - Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("avro")); - Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("")); - Assertions.assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(null)); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("parquet")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("PARQUET")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("orc")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("ORC")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" parquet ")); + assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" orc ")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("json")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("csv")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("avro")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("")); + assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(null)); } @Test