From 19796f14caa364afe4cd15a0d45e4a42dac8ba9d Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 28 Jan 2026 11:42:56 -0800 Subject: [PATCH 01/27] Pass schema from option instead of global configuration for thread safety --- .../org/apache/hudi/IncrementalRelation.scala | 12 +++--- .../LegacyHoodieParquetFileFormat.scala | 41 +++++++++---------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index ee23fe85c7632..d855a02e10b41 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -203,11 +203,6 @@ class IncrementalRelation(val sqlContext: SQLContext, sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath) sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) - // Pass table Avro schema to LegacyHoodieParquetFileFormat to preserve correct logical types - if (tableAvroSchema.getType != Schema.Type.NULL) { - LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf( - sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema) - } val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID case HoodieFileFormat.ORC => "orc" @@ -273,7 +268,12 @@ class IncrementalRelation(val sqlContext: SQLContext, if (regularFileIdToFullPath.nonEmpty) { try { - df = df.union(sqlContext.read.options(sOpts) + val optionsWithSchema = if (tableAvroSchema.getType != Schema.Type.NULL) { + sOpts + (LegacyHoodieParquetFileFormat.HOODIE_TABLE_AVRO_SCHEMA -> tableAvroSchema.toString) + } else { + sOpts + } + df = df.union(sqlContext.read.options(optionsWithSchema) .schema(usedSchema).format(formatClassName) // Setting time to the END_INSTANT_TIME, to avoid pathFilter filter out files incorrectly. .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala index 5ecdc8ae476e5..e3c579190dbba 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala @@ -41,8 +41,7 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS /** * Try to get table Avro schema from hadoopConf. - * Callers (e.g., IncrementalRelation, HoodieBaseRelation) should set the schema - * using LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf() before reading. + * This is used as a fallback when schema is not provided via options map. * * @return Some(schema) if found in hadoopConf, None otherwise (falls back to StructType conversion) */ @@ -85,12 +84,22 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS options.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean - // Try to get schema from hadoopConf (set by callers like IncrementalRelation) - val avroSchema = getTableAvroSchemaFromConf(hadoopConf).getOrElse { - // Fallback to converting StructType to Avro schema - val fullTableSchema = StructType(dataSchema.fields ++ partitionSchema.fields) - AvroConversionUtils.convertStructTypeToAvroSchema(fullTableSchema, dataSchema.typeName) - } + val avroSchema = options.get(LegacyHoodieParquetFileFormat.HOODIE_TABLE_AVRO_SCHEMA) + .map(s => { + try { + Some(new Schema.Parser().parse(s)) + } catch { + case e: Exception => + logWarning(s"Failed to parse table Avro schema from options: ${e.getMessage}") + None + } + }) + .flatten + .orElse(getTableAvroSchemaFromConf(hadoopConf)) + .getOrElse { + val fullTableSchema = StructType(dataSchema.fields ++ partitionSchema.fields) + AvroConversionUtils.convertStructTypeToAvroSchema(fullTableSchema, dataSchema.typeName) + } sparkAdapter .createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath, avroSchema).get @@ -102,19 +111,9 @@ object LegacyHoodieParquetFileFormat { val FILE_FORMAT_ID = "hoodie-parquet" /** - * Configuration key for passing table Avro schema through hadoopConf. - * If set, this schema will be used instead of fetching from storage. + * Configuration key for passing table Avro schema. + * Schema can be passed through options map (preferred, thread-safe) or hadoopConf (fallback). + * This preserves the correct logical types (e.g., timestampMillis vs timestampMicros). */ val HOODIE_TABLE_AVRO_SCHEMA = "hoodie.table.avro.schema" - - /** - * Helper method to set table Avro schema in hadoopConf. - * This allows callers to pass the schema through hadoopConf to avoid fetching from storage. - * - * @param hadoopConf The Hadoop configuration to set the schema in - * @param avroSchema The Avro schema to set - */ - def setTableAvroSchemaInConf(hadoopConf: Configuration, avroSchema: Schema): Unit = { - hadoopConf.set(HOODIE_TABLE_AVRO_SCHEMA, avroSchema.toString) - } } From 35a299542b6eb54b0371c74c6ca2e040f5b7c8f7 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 28 Jan 2026 14:54:58 -0800 Subject: [PATCH 02/27] Address partial comments --- .../parquet/schema/AvroSchemaRepair.java | 15 ----- .../apache/hudi/BaseHoodieTableFileIndex.java | 5 -- .../org/apache/hudi/avro/AvroSchemaCache.java | 2 - .../org/apache/hudi/avro/AvroSchemaUtils.java | 4 -- .../hudi/common/util/DateTimeUtils.java | 6 -- .../apache/hudi/ColumnStatsIndexSupport.scala | 56 ------------------- 6 files changed, 88 deletions(-) diff --git a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java index 75f880ae3f955..78908a40ae52e 100644 --- a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java +++ b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java @@ -32,7 +32,6 @@ import java.util.List; public class AvroSchemaRepair { - public static boolean isLocalTimestampSupported = isLocalTimestampMillisSupported(); public static Schema repairLogicalTypes(Schema fileSchema, Schema tableSchema) { Schema repairedSchema = repairAvroSchema(fileSchema, tableSchema); @@ -242,18 +241,4 @@ public static boolean hasTimestampMillisField(Schema tableSchema) { && (tableSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis || tableSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis); } } - - /** - * Check if LogicalTypes.LocalTimestampMillis is supported in the current Avro version - * - * @return true if LocalTimestampMillis is available, false otherwise - */ - public static boolean isLocalTimestampMillisSupported() { - try { - return Arrays.stream(LogicalTypes.class.getDeclaredClasses()) - .anyMatch(c -> c.getSimpleName().equals("LocalTimestampMillis")); - } catch (Exception e) { - return false; - } - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 6808ae1528279..824a94abab4bd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -479,11 +479,6 @@ protected boolean shouldReadAsPartitionedTable() { return (partitionColumns.length > 0 && canParsePartitionValues()) || HoodieTableMetadata.isMetadataTable(basePath.toString()); } - protected PartitionPath convertToPartitionPath(String partitionPath) { - Object[] partitionColumnValues = parsePartitionColumnValues(partitionColumns, partitionPath); - return new PartitionPath(partitionPath, partitionColumnValues); - } - private static long fileSliceSize(FileSlice fileSlice) { long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize) .filter(s -> s > 0) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java index b679ea3c8d508..cecc420813c3b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java @@ -31,7 +31,6 @@ */ public class AvroSchemaCache { - // Ensure that there is only one variable instance of the same schema within an entire JVM lifetime private static final LoadingCache SCHEMA_CACHE = Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k); @@ -43,5 +42,4 @@ public class AvroSchemaCache { public static Schema intern(Schema schema) { return SCHEMA_CACHE.get(schema); } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index c01af1b0644a2..c8fea10818c4a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -238,10 +238,6 @@ public static Option findNestedFieldSchema(Schema schema, String fieldNa return Option.of(getNonNullTypeFromUnion(schema)); } - public static Option findNestedFieldType(Schema schema, String fieldName) { - return findNestedFieldSchema(schema, fieldName).map(Schema::getType); - } - /** * Appends provided new fields at the end of the given schema * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java index 99efa89fa0542..aa26db8272eca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java @@ -52,12 +52,6 @@ public static Instant microsToInstant(long microsFromEpoch) { return Instant.ofEpochSecond(epochSeconds, nanoAdjustment); } - public static Instant nanosToInstant(long nanosFromEpoch) { - long epochSeconds = nanosFromEpoch / (1_000_000_000L); - long nanoAdjustment = nanosFromEpoch % (1_000_000_000L); - return Instant.ofEpochSecond(epochSeconds, nanoAdjustment); - } - /** * Converts provided {@link Instant} to microseconds (from epoch) */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 5fc0d0d6a6945..970cec0b79f85 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -146,62 +146,6 @@ class ColumnStatsIndexSupport(spark: SparkSession, } } - /** - * Loads view of the Column Stats Index in a transposed format where single row coalesces every columns' - * statistics for a single file, returning it as [[DataFrame]] - * - * Please check out scala-doc of the [[transpose]] method explaining this view in more details - */ - def loadTransposed[T](targetColumns: Seq[String], - shouldReadInMemory: Boolean, - prunedPartitions: Option[Set[String]] = None, - prunedFileNamesOpt: Option[Set[String]] = None)(block: DataFrame => T): T = { - cachedColumnStatsIndexViews.get(targetColumns) match { - case Some(cachedDF) => - block(cachedDF) - case None => - val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = prunedFileNamesOpt match { - case Some(prunedFileNames) => - val filterFunction = new SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] { - override def apply(r: HoodieMetadataColumnStats): java.lang.Boolean = { - prunedFileNames.contains(r.getFileName) - } - } - loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory).filter(filterFunction) - case None => - loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory) - } - - withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) { - val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns) - val df = if (shouldReadInMemory) { - // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows - // of the transposed table in memory, facilitating execution of the subsequently chained operations - // on it locally (on the driver; all such operations are actually going to be performed by Spark's - // Optimizer) - HoodieUnsafeUtils.createDataFrameFromRows(spark, transposedRows.collectAsList().asScala.toSeq, indexSchema) - } else { - val rdd = HoodieJavaRDD.getJavaRDD(transposedRows) - spark.createDataFrame(rdd, indexSchema) - } - - if (allowCaching) { - cachedColumnStatsIndexViews.put(targetColumns, df) - // NOTE: Instead of collecting the rows from the index and hold them in memory, we instead rely - // on Spark as (potentially distributed) cache managing data lifecycle, while we simply keep - // the referenced to persisted [[DataFrame]] instance - df.persist(StorageLevel.MEMORY_ONLY) - - block(df) - } else { - withPersistedDataset(df) { - block(df) - } - } - } - } - } - /** * Loads a view of the Column Stats Index in a raw format, returning it as [[DataFrame]] * From 6b97dce011c46f337c80c59132ba331374d7a402 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Thu, 29 Jan 2026 12:15:16 -0800 Subject: [PATCH 03/27] Address more comments --- .../io/storage/HoodieSparkParquetReader.java | 12 +++++- .../apache/spark/sql/hudi/SparkAdapter.scala | 4 +- .../org/apache/hudi/avro/AvroSchemaUtils.java | 38 ++++++++++++++++-- .../org/apache/hudi/avro/HoodieAvroUtils.java | 37 +++++++++--------- .../log/AbstractHoodieLogRecordReader.java | 22 ++++++++--- .../common/table/log/HoodieLogFileReader.java | 25 ++++++------ .../table/log/HoodieLogFormatReader.java | 15 ++++--- .../table/log/block/HoodieAvroDataBlock.java | 9 ++++- .../table/log/block/HoodieCDCDataBlock.java | 2 +- .../convert/AvroInternalSchemaConverter.java | 39 ++----------------- .../functional/TestHoodieLogFormat.java | 2 +- .../spark/sql/adapter/Spark2Adapter.scala | 3 +- .../spark/sql/adapter/BaseSpark3Adapter.scala | 3 +- .../spark/sql/adapter/Spark3_4Adapter.scala | 7 +++- 14 files changed, 126 insertions(+), 92 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index 45b0093b8ffd7..1f3e3865c3a40 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -65,6 +65,10 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader { private final BaseFileUtils parquetUtils; private List readerIterators = new ArrayList<>(); public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable"; + private final StoragePath path; + private final HoodieStorage storage; + private final FileFormatUtils parquetUtils; + private List readerIterators = new ArrayList<>(); private Option fileSchemaOption = Option.empty(); private Option structTypeOption = Option.empty(); private Option schemaOption = Option.empty(); @@ -126,6 +130,9 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema if (requestedSchema == null) { requestedSchema = readerSchema; } + // Set configuration for timestamp_millis type repair. + storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); + MessageType fileSchema = getFileSchema(); Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema); Option messageSchema = Option.of(getAvroSchemaConverter(conf).convert(nonNullSchema)); @@ -136,8 +143,9 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema conf.set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString()); conf.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString()); ParquetReader reader = ParquetReader.builder( - (ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema), - new Path(path.toUri())).withConf(conf) + (ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(storage, messageSchema), + new Path(path.toUri())) + .withConf(storage.getConf().unwrapAs(Configuration.class)) .build(); ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader); readerIterators.add(parquetReaderIterator); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index e8058aa1f2248..606546a58e387 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -59,7 +59,9 @@ trait SparkAdapter extends Serializable { def isTimestampNTZType(dataType: DataType): Boolean - def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): org.apache.parquet.hadoop.api.ReadSupport[_] + def getParquetReadSupport(storage: HoodieStorage, + messageSchema: org.apache.hudi.common.util.Option[MessageType]): + org.apache.parquet.hadoop.api.ReadSupport[_] def repairSchemaIfSpecified(shouldRepair: Boolean, fileSchema: MessageType, diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index c8fea10818c4a..7f4cf8c42502b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -43,6 +43,17 @@ */ public class AvroSchemaUtils { + private static final Class AVRO_SCHEMA_REPAIR_CLASS; + static { + Class clazz = null; + try { + clazz = Class.forName("org.apache.parquet.schema.AvroSchemaRepair"); + } catch (ClassNotFoundException e) { + // AvroSchemaRepair not on classpath (e.g. when parquet schema not available) + } + AVRO_SCHEMA_REPAIR_CLASS = clazz; + } + private AvroSchemaUtils() {} /** @@ -395,13 +406,32 @@ public static void checkSchemaCompatible( } public static Schema getRepairedSchema(Schema writerSchema, Schema readerSchema) { + if (AVRO_SCHEMA_REPAIR_CLASS == null) { + return writerSchema; + } try { - Class avroSchemaRepairClass = Class.forName("org.apache.parquet.schema.AvroSchemaRepair"); - Method repairMethod = avroSchemaRepairClass.getMethod("repairLogicalTypes", Schema.class, Schema.class); + Method repairMethod = + AVRO_SCHEMA_REPAIR_CLASS.getMethod("repairLogicalTypes", Schema.class, Schema.class); return (Schema) repairMethod.invoke(null, writerSchema, readerSchema); - } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - // Fallback if class/method not available + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { return writerSchema; } } + + /** + * Returns true if the given Avro schema contains any timestamp-millis logical type. + * Used to decide whether to enable logical timestamp field repair when reading log blocks. + */ + public static boolean hasTimestampMillisField(Schema schema) { + if (schema == null || AVRO_SCHEMA_REPAIR_CLASS == null) { + return false; + } + try { + Method hasTimestampMillisFieldMethod = + AVRO_SCHEMA_REPAIR_CLASS.getMethod("hasTimestampMillisField", Schema.class); + return (Boolean) hasTimestampMillisFieldMethod.invoke(null, schema); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + return false; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 3e51d826a9ca6..7221ed9f00b2c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -161,6 +161,19 @@ public class HoodieAvroUtils { private static final Pattern INVALID_AVRO_FIRST_CHAR_IN_NAMES_PATTERN = Pattern.compile("[^A-Za-z_]"); private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__"; + // Cached Avro logical type classes (null if not available, e.g., Avro 1.8.2). + // Loaded once at class init to avoid repeated Class.forName() and reflection cost per record. + private static final Class LOCAL_TIMESTAMP_MILLIS_CLASS = loadClassSafe("org.apache.avro.LogicalTypes$LocalTimestampMillis"); + private static final Class LOCAL_TIMESTAMP_MICROS_CLASS = loadClassSafe("org.apache.avro.LogicalTypes$LocalTimestampMicros"); + + private static Class loadClassSafe(String name) { + try { + return Class.forName(name); + } catch (ClassNotFoundException e) { + return null; + } + } + // All metadata fields are optional strings. public static final Schema METADATA_FIELD_SCHEMA = createNullableSchema(Schema.Type.STRING); @@ -1406,34 +1419,22 @@ public static Comparable unwrapAvroValueWrapper(Object avroValueWrapper) { * Checks if a logical type is an instance of LocalTimestampMillis using reflection. * Returns false if the class doesn't exist (e.g., in Avro 1.8.2). */ - private static boolean isLocalTimestampMillis(LogicalType logicalType) { - if (logicalType == null) { - return false; - } - try { - Class localTimestampMillisClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMillis"); - return localTimestampMillisClass.isInstance(logicalType); - } catch (ClassNotFoundException e) { - // Class doesn't exist (e.g., Avro 1.8.2) + public static boolean isLocalTimestampMillis(LogicalType logicalType) { + if (logicalType == null || LOCAL_TIMESTAMP_MILLIS_CLASS == null) { return false; } + return LOCAL_TIMESTAMP_MILLIS_CLASS.isInstance(logicalType); } /** * Checks if a logical type is an instance of LocalTimestampMicros using reflection. * Returns false if the class doesn't exist (e.g., in Avro 1.8.2). */ - private static boolean isLocalTimestampMicros(LogicalType logicalType) { - if (logicalType == null) { - return false; - } - try { - Class localTimestampMicrosClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMicros"); - return localTimestampMicrosClass.isInstance(logicalType); - } catch (ClassNotFoundException e) { - // Class doesn't exist (e.g., Avro 1.8.2) + public static boolean isLocalTimestampMicros(LogicalType logicalType) { + if (logicalType == null || LOCAL_TIMESTAMP_MICROS_CLASS == null) { return false; } + return LOCAL_TIMESTAMP_MICROS_CLASS.isInstance(logicalType); } private static Object convertDefaultValueForAvroCompatibility(Object defaultValue) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 3678efe786252..55c54842a0a4e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieLogFile; @@ -148,6 +149,8 @@ public abstract class AbstractHoodieLogRecordReader { private final List validBlockInstants = new ArrayList<>(); // Use scanV2 method. private final boolean enableOptimizedLogBlocksScan; + // Enable logical timestamp field repair for Avro log blocks (computed once from reader schema). + private final boolean enableLogicalTimestampFieldRepair; protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, @@ -182,6 +185,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List keySpecOpt) { HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); try { // Iterate over the paths - logFormatReaderWrapper = new HoodieLogFormatReader(fs, - logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()), - readerSchema, true, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); + logFormatReaderWrapper = new HoodieLogFormatReader(storage, + logFilePaths.stream() + .map(filePath -> new HoodieLogFile(new StoragePath(filePath))) + .collect(Collectors.toList()), + readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema, + enableLogicalTimestampFieldRepair); Set scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { @@ -553,9 +560,12 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); try { // Iterate over the paths - logFormatReaderWrapper = new HoodieLogFormatReader(fs, - logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()), - readerSchema, true, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); + logFormatReaderWrapper = new HoodieLogFormatReader(storage, + logFilePaths.stream() + .map(logFile -> new HoodieLogFile(new StoragePath(logFile))) + .collect(Collectors.toList()), + readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema, + enableLogicalTimestampFieldRepair); /** * Scanning log blocks and placing the compacted blocks at the right place require two traversals. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index cf21ef5f42c81..ab6d763345fb0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.fs.TimedFSDataInputStream; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; @@ -87,6 +88,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private long lastReverseLogFilePosition; private final boolean reverseReader; private final boolean enableRecordLookups; + private final boolean enableLogicalTimestampFieldRepair; private boolean closed = false; private FSDataInputStream inputStream; @@ -101,17 +103,17 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc HoodieRecord.RECORD_KEY_METADATA_FIELD); } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, - String keyField) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema()); + public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean reverseReader, + boolean enableRecordLookups, String keyField) throws IOException { + this(storage, logFile, readerSchema, bufferSize, reverseReader, enableRecordLookups, keyField, + InternalSchema.getEmptyInternalSchema(), + readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, - String keyField, InternalSchema internalSchema) throws IOException { - this.fs = fs; - this.hadoopConf = fs.getConf(); + public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean reverseReader, + boolean enableRecordLookups, String keyField, InternalSchema internalSchema, + boolean enableLogicalTimestampFieldRepair) throws IOException { + this.storage = storage; // NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path // is prefixed with an appropriate scheme given that we're not propagating the FS // further @@ -125,6 +127,7 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc this.enableRecordLookups = enableRecordLookups; this.keyField = keyField; this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; + this.enableLogicalTimestampFieldRepair = enableLogicalTimestampFieldRepair; if (this.reverseReader) { this.reverseLogFilePosition = this.lastReverseLogFilePosition = this.logFile.getFileSize(); } @@ -200,8 +203,8 @@ private HoodieLogBlock readBlock() throws IOException { if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, internalSchema); } else { - return new HoodieAvroDataBlock(() -> getFSDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, - getTargetReaderSchemaForBlock(), header, footer, keyField); + return new HoodieAvroDataBlock(() -> getDataInputStream(storage, this.logFile, bufferSize), content, true, logBlockContentLoc, + getTargetReaderSchemaForBlock(), header, footer, keyField, enableLogicalTimestampFieldRepair); } case HFILE_DATA_BLOCK: diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index 955f5485ed459..ca90e8b1e06a1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -45,12 +45,14 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final String recordKeyField; private final boolean enableInlineReading; private final int bufferSize; + private final boolean enableLogicalTimestampFieldRepair; private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatReader.class); HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader, int bufferSize, boolean enableRecordLookups, - String recordKeyField, InternalSchema internalSchema) throws IOException { + String recordKeyField, InternalSchema internalSchema, + boolean enableLogicalTimestampFieldRepair) throws IOException { this.logFiles = logFiles; this.fs = fs; this.readerSchema = readerSchema; @@ -59,10 +61,11 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.recordKeyField = recordKeyField; this.enableInlineReading = enableRecordLookups; this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; - if (logFiles.size() > 0) { + this.enableLogicalTimestampFieldRepair = enableLogicalTimestampFieldRepair; + if (!logFiles.isEmpty()) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, - enableRecordLookups, recordKeyField, internalSchema); + this.currentReader = new HoodieLogFileReader(storage, nextLogFile, readerSchema, bufferSize, false, + enableRecordLookups, recordKeyField, internalSchema, enableLogicalTimestampFieldRepair); } } @@ -86,8 +89,8 @@ public boolean hasNext() { try { HoodieLogFile nextLogFile = logFiles.remove(0); this.currentReader.close(); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, - enableInlineReading, recordKeyField, internalSchema); + this.currentReader = new HoodieLogFileReader(storage, nextLogFile, readerSchema, bufferSize, false, + enableInlineReading, recordKeyField, internalSchema, enableLogicalTimestampFieldRepair); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 6f04583c0f191..d7ec98758fbbc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -73,6 +73,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { private final ThreadLocal encoderCache = new ThreadLocal<>(); + private final boolean enableLogicalTimestampFieldRepair; public HoodieAvroDataBlock(Supplier inputStreamSupplier, Option content, @@ -81,8 +82,10 @@ public HoodieAvroDataBlock(Supplier inputStreamSupplier, Option readerSchema, Map header, Map footer, - String keyField) { + String keyField, + boolean enableLogicalTimestampFieldRepair) { super(content, inputStreamSupplier, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false); + this.enableLogicalTimestampFieldRepair = enableLogicalTimestampFieldRepair; } public HoodieAvroDataBlock(@Nonnull List records, @@ -90,6 +93,7 @@ public HoodieAvroDataBlock(@Nonnull List records, @Nonnull String keyField ) { super(records, header, new HashMap<>(), keyField); + this.enableLogicalTimestampFieldRepair = false; } @Override @@ -143,7 +147,7 @@ protected ClosableIterator> deserializeRecords(byte[] conten checkState(this.readerSchema != null, "Reader's schema has to be non-null"); checkArgument(type != HoodieRecordType.SPARK, "Not support read avro to spark record"); // TODO AvroSparkReader need - RecordIterator iterator = RecordIterator.getInstance(this, content, true); + RecordIterator iterator = RecordIterator.getInstance(this, content, enableLogicalTimestampFieldRepair); return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord) new HoodieAvroIndexedRecord(data)); } @@ -234,6 +238,7 @@ public IndexedRecord next() { @Deprecated public HoodieAvroDataBlock(List records, Schema schema) { super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA, schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD); + this.enableLogicalTimestampFieldRepair = false; } public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java index 8f2cd8c644786..76527fc2e874b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java @@ -43,7 +43,7 @@ public HoodieCDCDataBlock( Map header, String keyField) { super(inputStreamSupplier, content, readBlockLazily, logBlockContentLocation, - Option.of(readerSchema), header, new HashMap<>(), keyField); + Option.of(readerSchema), header, new HashMap<>(), keyField, false); } public HoodieCDCDataBlock(List records, diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java index ac14ea9e5c6e8..aa2a64e11d770 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java @@ -18,6 +18,7 @@ package org.apache.hudi.internal.schema.convert; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.internal.schema.InternalSchema; @@ -239,9 +240,9 @@ private static Type visitAvroPrimitiveToBuildInternalType(Schema primitive) { return Types.TimestampMillisType.get(); } else if (logical instanceof LogicalTypes.TimestampMicros) { return Types.TimestampType.get(); - } else if (isLocalTimestampMillis(logical)) { + } else if (HoodieAvroUtils.isLocalTimestampMillis(logical)) { return Types.LocalTimestampMillisType.get(); - } else if (isLocalTimestampMicros(logical)) { + } else if (HoodieAvroUtils.isLocalTimestampMicros(logical)) { return Types.LocalTimestampMicrosType.get(); } else if (LogicalTypes.uuid().getName().equals(name)) { return Types.UUIDType.get(); @@ -497,40 +498,6 @@ private static int computeMinBytesForPrecision(int precision) { return numBytes; } - /** - * Checks if a logical type is an instance of LocalTimestampMillis using reflection. - * Returns false if the class doesn't exist (e.g., in Avro 1.8.2). - */ - private static boolean isLocalTimestampMillis(LogicalType logicalType) { - if (logicalType == null) { - return false; - } - try { - Class localTimestampMillisClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMillis"); - return localTimestampMillisClass.isInstance(logicalType); - } catch (ClassNotFoundException e) { - // Class doesn't exist (e.g., Avro 1.8.2) - return false; - } - } - - /** - * Checks if a logical type is an instance of LocalTimestampMicros using reflection. - * Returns false if the class doesn't exist (e.g., in Avro 1.8.2). - */ - private static boolean isLocalTimestampMicros(LogicalType logicalType) { - if (logicalType == null) { - return false; - } - try { - Class localTimestampMicrosClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMicros"); - return localTimestampMicrosClass.isInstance(logicalType); - } catch (ClassNotFoundException e) { - // Class doesn't exist (e.g., Avro 1.8.2) - return false; - } - } - /** * Creates a LocalTimestampMicros schema using reflection. * Returns null if the class doesn't exist (e.g., in Avro 1.8.2). diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 601f83101c9b7..129bb5287736e 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -425,7 +425,7 @@ public void testHugeLogFileWrite() throws IOException, URISyntaxException, Inter byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes(); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(new Configuration(), null, 0, dataBlockContentBytes.length, 0); HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable(dataBlockContentBytes), false, - logBlockContentLoc, Option.ofNullable(getSimpleSchema()), header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD); + logBlockContentLoc, Option.ofNullable(getSimpleSchema()), header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD, false); long writtenSize = 0; int logBlockWrittenNum = 0; while (writtenSize < Integer.MAX_VALUE) { diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 47f2ce46c7bf9..88ae565513a61 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -87,7 +87,8 @@ class Spark2Adapter extends SparkAdapter { dataType.getClass.getSimpleName.startsWith("TimestampNTZType") } - override def getParquetReadSupport(messageScheme: org.apache.hudi.common.util.Option[MessageType]): + override def getParquetReadSupport(storage HoodieStorage, + messageScheme: org.apache.hudi.common.util.Option[MessageType]): org.apache.parquet.hadoop.api.ReadSupport[_] = { // ParquetReadSupport is package-private in Spark 2.4, so we use reflection to instantiate it val clazz = Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport") diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 17de9e00fea45..1463a791b715c 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -114,7 +114,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { fileSchema } - override def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { + override def getParquetReadSupport(storage: HoodieStorage, + messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { new ParquetReadSupport() } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index b7f1e69d60888..e84919ef437ef 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -81,11 +81,14 @@ class Spark3_4Adapter extends BaseSpark3Adapter { dataType == DataTypes.TimestampNTZType } - override def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { + override def getParquetReadSupport(storage: HoodieStorage, + messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { + val enableTimestampFieldRepair = storage.getConf.getBoolean( + HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) new HoodieParquetReadSupport( Option.empty[ZoneId], enableVectorizedReader = true, - enableTimestampFieldRepair = true, + enableTimestampFieldRepair, RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName("CORRECTED")), RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName("LEGACY")), messageSchema From 5d472f29e2bc353d31dfeef4fefaad24e64be154 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Thu, 5 Feb 2026 11:32:54 -0800 Subject: [PATCH 04/27] =?UTF-8?q?changing=20code=20parts=20around=20Hoodie?= =?UTF-8?q?Storage=20(introduced=20in=200.15.0.)=20to=20use=C2=A0=20FileSy?= =?UTF-8?q?stem=20and=20hadoop=20Configuration=20changes=20(0.14=20equival?= =?UTF-8?q?ent)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/storage/HoodieSparkParquetReader.java | 14 ++++------- .../apache/spark/sql/hudi/SparkAdapter.scala | 2 +- .../log/AbstractHoodieLogRecordReader.java | 16 +++++++------ .../common/table/log/HoodieLogFileReader.java | 23 ++++++++----------- .../table/log/HoodieLogFormatReader.java | 4 ++-- .../spark/sql/adapter/Spark2Adapter.scala | 2 +- .../spark/sql/adapter/BaseSpark3Adapter.scala | 2 +- .../spark/sql/adapter/Spark3_4Adapter.scala | 4 ++-- 8 files changed, 31 insertions(+), 36 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index 1f3e3865c3a40..c312327397684 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -65,10 +65,6 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader { private final BaseFileUtils parquetUtils; private List readerIterators = new ArrayList<>(); public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable"; - private final StoragePath path; - private final HoodieStorage storage; - private final FileFormatUtils parquetUtils; - private List readerIterators = new ArrayList<>(); private Option fileSchemaOption = Option.empty(); private Option structTypeOption = Option.empty(); private Option schemaOption = Option.empty(); @@ -77,7 +73,7 @@ public HoodieSparkParquetReader(Configuration conf, Path path) { this.path = path; this.conf = new Configuration(conf); // Avoid adding record in list element when convert parquet schema to avro schema - conf.set(ADD_LIST_ELEMENT_RECORDS, "false"); + this.conf.set(ADD_LIST_ELEMENT_RECORDS, "false"); this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); } @@ -131,7 +127,7 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema requestedSchema = readerSchema; } // Set configuration for timestamp_millis type repair. - storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); + conf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); MessageType fileSchema = getFileSchema(); Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema); @@ -143,9 +139,9 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema conf.set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString()); conf.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString()); ParquetReader reader = ParquetReader.builder( - (ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(storage, messageSchema), - new Path(path.toUri())) - .withConf(storage.getConf().unwrapAs(Configuration.class)) + (ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(conf, messageSchema), + path) + .withConf(conf) .build(); ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader); readerIterators.add(parquetReaderIterator); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 606546a58e387..eb0e3b4901c67 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -59,7 +59,7 @@ trait SparkAdapter extends Serializable { def isTimestampNTZType(dataType: DataType): Boolean - def getParquetReadSupport(storage: HoodieStorage, + def getParquetReadSupport(conf: Configuration, messageSchema: org.apache.hudi.common.util.Option[MessageType]): org.apache.parquet.hadoop.api.ReadSupport[_] diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 55c54842a0a4e..7222f334e55b8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -41,7 +41,6 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; @@ -112,6 +111,8 @@ public abstract class AbstractHoodieLogRecordReader { private final TypedProperties payloadProps; // Log File Paths protected final List logFilePaths; + // Read block lazily - when true, reads block content on demand + private final boolean readBlocksLazily; // Reverse reader - Not implemented yet (NA -> Why do we need ?) // but present here for plumbing for future implementation private final boolean reverseReader; @@ -177,6 +178,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List keySpecOpt) { HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); try { // Iterate over the paths - logFormatReaderWrapper = new HoodieLogFormatReader(storage, + logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream() - .map(filePath -> new HoodieLogFile(new StoragePath(filePath))) + .map(filePath -> new HoodieLogFile(filePath)) .collect(Collectors.toList()), - readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema, + readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema, enableLogicalTimestampFieldRepair); Set scannedLogFiles = new HashSet<>(); @@ -560,11 +562,11 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); try { // Iterate over the paths - logFormatReaderWrapper = new HoodieLogFormatReader(storage, + logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream() - .map(logFile -> new HoodieLogFile(new StoragePath(logFile))) + .map(logFile -> new HoodieLogFile(logFile)) .collect(Collectors.toList()), - readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema, + readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema, enableLogicalTimestampFieldRepair); /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index ab6d763345fb0..15cd90f844031 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.fs.TimedFSDataInputStream; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; @@ -100,20 +99,18 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, - HoodieRecord.RECORD_KEY_METADATA_FIELD); + HoodieRecord.RECORD_KEY_METADATA_FIELD, InternalSchema.getEmptyInternalSchema(), false); } - public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean reverseReader, - boolean enableRecordLookups, String keyField) throws IOException { - this(storage, logFile, readerSchema, bufferSize, reverseReader, enableRecordLookups, keyField, - InternalSchema.getEmptyInternalSchema(), - readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); - } - - public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean reverseReader, - boolean enableRecordLookups, String keyField, InternalSchema internalSchema, + /** + * Constructor with full options for use by HoodieLogFormatReader (FileSystem-based, no storage abstraction). + */ + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, + boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, + String keyField, InternalSchema internalSchema, boolean enableLogicalTimestampFieldRepair) throws IOException { - this.storage = storage; + this.fs = fs; + this.hadoopConf = fs.getConf(); // NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path // is prefixed with an appropriate scheme given that we're not propagating the FS // further @@ -203,7 +200,7 @@ private HoodieLogBlock readBlock() throws IOException { if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, internalSchema); } else { - return new HoodieAvroDataBlock(() -> getDataInputStream(storage, this.logFile, bufferSize), content, true, logBlockContentLoc, + return new HoodieAvroDataBlock(() -> getFSDataInputStream(fs, this.logFile, bufferSize), content, true, logBlockContentLoc, getTargetReaderSchemaForBlock(), header, footer, keyField, enableLogicalTimestampFieldRepair); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index ca90e8b1e06a1..ed27e503fb891 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -64,7 +64,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.enableLogicalTimestampFieldRepair = enableLogicalTimestampFieldRepair; if (!logFiles.isEmpty()) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(storage, nextLogFile, readerSchema, bufferSize, false, + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableRecordLookups, recordKeyField, internalSchema, enableLogicalTimestampFieldRepair); } } @@ -89,7 +89,7 @@ public boolean hasNext() { try { HoodieLogFile nextLogFile = logFiles.remove(0); this.currentReader.close(); - this.currentReader = new HoodieLogFileReader(storage, nextLogFile, readerSchema, bufferSize, false, + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading, recordKeyField, internalSchema, enableLogicalTimestampFieldRepair); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 88ae565513a61..77d77322d16d1 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -87,7 +87,7 @@ class Spark2Adapter extends SparkAdapter { dataType.getClass.getSimpleName.startsWith("TimestampNTZType") } - override def getParquetReadSupport(storage HoodieStorage, + override def getParquetReadSupport(conf: Configuration, messageScheme: org.apache.hudi.common.util.Option[MessageType]): org.apache.parquet.hadoop.api.ReadSupport[_] = { // ParquetReadSupport is package-private in Spark 2.4, so we use reflection to instantiate it diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 1463a791b715c..502ba9d477cbe 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -114,7 +114,7 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { fileSchema } - override def getParquetReadSupport(storage: HoodieStorage, + override def getParquetReadSupport(conf: Configuration, messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { new ParquetReadSupport() } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index e84919ef437ef..df1273cc96a12 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -81,9 +81,9 @@ class Spark3_4Adapter extends BaseSpark3Adapter { dataType == DataTypes.TimestampNTZType } - override def getParquetReadSupport(storage: HoodieStorage, + override def getParquetReadSupport(conf: Configuration, messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { - val enableTimestampFieldRepair = storage.getConf.getBoolean( + val enableTimestampFieldRepair = conf.getBoolean( HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) new HoodieParquetReadSupport( Option.empty[ZoneId], From eee51bef3a1c344b4f62242daa10165b586ab778 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Thu, 29 Jan 2026 17:32:48 -0800 Subject: [PATCH 05/27] Addressed comments --- .../org/apache/hudi/HoodieFileIndex.scala | 56 +++++++++++-------- .../org/apache/hudi/TestHoodieFileIndex.scala | 48 +++++++++++++++- 2 files changed, 81 insertions(+), 23 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 124291f1bef0d..293c2552d4457 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -354,32 +354,12 @@ case class HoodieFileIndex(spark: SparkSession, // Identify timestamp-millis columns from the Avro schema to skip from filter translation // (even if they're in the index, they may have been indexed before the fix and should not be used for filtering) - val timestampMillisColumns = scala.collection.mutable.Set[String]() - try { - val avroSchema = rawAvroSchema - if (avroSchema.getType == org.apache.avro.Schema.Type.RECORD) { - avroSchema.getFields.asScala.foreach { field => - val fieldSchema = AvroSchemaUtils.getNonNullTypeFromUnion(field.schema()) - if (fieldSchema.getType == org.apache.avro.Schema.Type.LONG) { - val logicalType = fieldSchema.getLogicalType - if (logicalType != null) { - val logicalTypeName = logicalType.getName - if (logicalTypeName == "timestamp-millis" || logicalTypeName == "local-timestamp-millis") { - timestampMillisColumns.add(field.name()) - } - } - } - } - } - } catch { - case e: Exception => - logDebug(s"Failed to identify timestamp-millis columns from Avro schema: ${e.getMessage}") - } + val timestampMillisColumns = HoodieFileIndex.getTimestampMillisColumns(rawAvroSchema) columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF => val indexSchema = transposedColStatsDF.schema val indexFilter = - queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema, timestampMillisColumns.toSet)) + queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema, timestampMillisColumns)) .reduce(And) val allIndexedFileNames = @@ -472,6 +452,38 @@ object HoodieFileIndex extends Logging { schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) } } + /** + * Identifies timestamp-millis columns from the Avro schema. These columns are excluded from + * column-stats filter translation (e.g. they may have been indexed before a fix and should + * not be used for filtering). + * + * @param avroSchema the table's Avro schema + * @return set of field names whose type is timestamp-millis or local-timestamp-millis + */ + def getTimestampMillisColumns(avroSchema: org.apache.avro.Schema): Set[String] = { + val timestampMillisColumns = scala.collection.mutable.Set[String]() + try { + if (avroSchema.getType == org.apache.avro.Schema.Type.RECORD) { + avroSchema.getFields.asScala.foreach { field => + val fieldSchema = AvroSchemaUtils.getNonNullTypeFromUnion(field.schema()) + if (fieldSchema.getType == org.apache.avro.Schema.Type.LONG) { + val logicalType = fieldSchema.getLogicalType + if (logicalType != null) { + val logicalTypeName = logicalType.getName + if (logicalTypeName == "timestamp-millis" || logicalTypeName == "local-timestamp-millis") { + timestampMillisColumns.add(field.name()) + } + } + } + } + } + } catch { + case e: Exception => + logDebug(s"Failed to identify timestamp-millis columns from Avro schema: ${e.getMessage}") + } + timestampMillisColumns.toSet + } + def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { val sqlConf: SQLConf = spark.sessionState.conf val properties = TypedProperties.fromMap(options.filter(p => p._2 != null).asJava) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 803702addb489..978248ed0cb5f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -43,13 +43,15 @@ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.hudi.testutils.HoodieSparkClientTestBase import org.apache.hudi.util.JFunction + +import org.apache.avro.Schema import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal} import org.apache.spark.sql.execution.datasources.{NoopCache, PartitionDirectory} import org.apache.spark.sql.functions.{lit, struct} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.types.{IntegerType, StringType} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource} @@ -788,6 +790,50 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS } } + @Test + def testGetTimestampMillisColumns(): Unit = { + // RECORD with timestamp-millis and local-timestamp-millis -> both returned + val recordWithTimestampMillisSchema = new Schema.Parser().parse( + """ + |{ + | "type": "record", + | "name": "TestRecord", + | "fields": [ + | {"name": "ts_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}}, + | {"name": "ts_local_millis", "type": {"type": "long", "logicalType": "local-timestamp-millis"}}, + | {"name": "plain_long", "type": "long"}, + | {"name": "name", "type": "string"} + | ] + |} + |""".stripMargin) + val resultWithTimestampMillis = HoodieFileIndex.getTimestampMillisColumns(recordWithTimestampMillisSchema) + assertEquals(2, resultWithTimestampMillis.size) + assertTrue(resultWithTimestampMillis.contains("ts_millis")) + assertTrue(resultWithTimestampMillis.contains("ts_local_millis")) + assertFalse(resultWithTimestampMillis.contains("plain_long")) + assertFalse(resultWithTimestampMillis.contains("name")) + + // RECORD with only plain long and string -> empty set + val recordWithoutTimestampMillisSchema = new Schema.Parser().parse( + """ + |{ + | "type": "record", + | "name": "PlainRecord", + | "fields": [ + | {"name": "id", "type": "long"}, + | {"name": "name", "type": "string"} + | ] + |} + |""".stripMargin) + val resultPlain = HoodieFileIndex.getTimestampMillisColumns(recordWithoutTimestampMillisSchema) + assertTrue(resultPlain.isEmpty) + + // Non-RECORD schema -> empty set + val stringSchema = Schema.create(Schema.Type.STRING) + val resultNonRecord = HoodieFileIndex.getTimestampMillisColumns(stringSchema) + assertTrue(resultNonRecord.isEmpty) + } + private def attribute(partition: String): AttributeReference = { AttributeReference(partition, StringType, true)() } From ba8c90ec8e012f4912ee5d100a993cc66415635a Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Thu, 29 Jan 2026 23:02:03 -0800 Subject: [PATCH 06/27] address wiring comments --- .../org/apache/hudi/HoodieBaseRelation.scala | 10 ++- .../org/apache/hudi/IncrementalRelation.scala | 10 +++ .../LegacyHoodieParquetFileFormat.scala | 69 +++++++++++++--- .../org/apache/hudi/TestHoodieFileIndex.scala | 82 ++++++++++--------- ...Spark34LegacyHoodieParquetFileFormat.scala | 3 - 5 files changed, 117 insertions(+), 57 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 0d0b595730a2b..32f6c96851f6a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hudi.AvroConversionUtils.getAvroSchemaWithDefaults import org.apache.hudi.HoodieBaseRelation._ import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, SerializableConfiguration} import org.apache.hudi.common.fs.FSUtils @@ -47,6 +47,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.io.storage.HoodieAvroHFileReader +import org.apache.hudi.io.storage.HoodieSparkParquetReader import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging @@ -108,7 +109,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def tableName: String = metaClient.getTableConfig.getTableName - protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + protected lazy val conf: Configuration = { + val c = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + c.set(HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString) + c + } protected lazy val jobConf = new JobConf(conf) protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index d855a02e10b41..1a2dc403d2aac 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -19,6 +19,7 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.hadoop.fs.{GlobPattern, Path} +import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling @@ -35,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.{HoodieException, HoodieIncrementalPathNotFoundException} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.utils.SerDeHelper +import org.apache.hudi.io.storage.HoodieSparkParquetReader import org.apache.hudi.table.HoodieSparkTable import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD @@ -203,6 +205,14 @@ class IncrementalRelation(val sqlContext: SQLContext, sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath) sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) + // Pass table Avro schema to hadoopConf so supportBatch() can find it (supportBatch does not receive options). + if (tableAvroSchema.getType != Schema.Type.NULL) { + LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf( + sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema) + sqlContext.sparkContext.hadoopConfiguration.set( + HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString) + } val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID case HoodieFileFormat.ORC => "orc" diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala index e3c579190dbba..4b9e0fc5c369d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala @@ -25,7 +25,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat.FILE_FORMAT_ID +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat.{FILE_FORMAT_ID, HOODIE_TABLE_AVRO_SCHEMA} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType @@ -41,9 +44,12 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS /** * Try to get table Avro schema from hadoopConf. - * This is used as a fallback when schema is not provided via options map. + * Callers (e.g., IncrementalRelation) should set the schema using + * LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf() before reading so that + * supportBatch() can find it (supportBatch does not receive the relation's options). + * This is also used as a fallback in buildReaderWithPartitionValues when schema is not in options. * - * @return Some(schema) if found in hadoopConf, None otherwise (falls back to StructType conversion) + * @return Some(schema) if found in hadoopConf, None otherwise */ private def getTableAvroSchemaFromConf(hadoopConf: Configuration): Option[Schema] = { val schemaStr = hadoopConf.get(LegacyHoodieParquetFileFormat.HOODIE_TABLE_AVRO_SCHEMA) @@ -62,15 +68,29 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS } } + /** + * Returns whether batch/columnar read is supported. + * + * Where the second parameter `schema` comes from: Spark's FileSourceScanExec (or + * DataSourceScanExec) calls FileFormat.supportBatch(session, schema). The schema is typically + * the relation's read schema (HadoopFsRelation.dataSchema or the scan's required schema). + * During plan canonicalization (e.g. makeCopy), Spark can pass a schema derived from the + * plan's output attributes, which may have anonymized names (e.g. all "none"). + * + * We must NOT use that schema for Avro conversion: it can be wrong and cause + * AvroRuntimeException (duplicate field "none"). We only use the table Avro schema from + * hadoopConf; if not set, we return false (disable batch). + */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - // Try to get schema from hadoopConf (set by callers like IncrementalRelation) - val avroSchema = getTableAvroSchemaFromConf(sparkSession.sessionState.newHadoopConf()).getOrElse { - // Fallback to converting StructType to Avro schema - AvroConversionUtils.convertStructTypeToAvroSchema(schema, schema.typeName) + getTableAvroSchemaFromConf(sparkSession.sessionState.newHadoopConf()) match { + case Some(avroSchema) => + sparkAdapter + .createLegacyHoodieParquetFileFormat(true, avroSchema).get.supportBatch(sparkSession, schema) + case None => + // Table Avro schema not in hadoopConf (supportBatch does not receive options). + // Do not use the passed-in schema for conversion - it can be wrong (canonicalized "none" names). + false } - - sparkAdapter - .createLegacyHoodieParquetFileFormat(true, avroSchema).get.supportBatch(sparkSession, schema) } override def buildReaderWithPartitionValues(sparkSession: SparkSession, @@ -101,9 +121,26 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS AvroConversionUtils.convertStructTypeToAvroSchema(fullTableSchema, dataSchema.typeName) } - sparkAdapter + val delegateReader = sparkAdapter .createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath, avroSchema).get .buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) + + val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) + val useBatchPlan = supportBatch(sparkSession, resultSchema) + if (useBatchPlan) { + // Spark built a columnar plan and expects Iterator[ColumnarBatch]. Pass through as-is. + (file: PartitionedFile) => delegateReader(file) + } else { + // Spark built a row-based plan. The delegate may still return ColumnarBatch when it + // enables vectorized read. Convert any batch to rows so we always yield InternalRow. + (file: PartitionedFile) => { + val iter = delegateReader(file).asInstanceOf[Iterator[Any]] + iter.flatMap { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala + } + } + } } } @@ -112,8 +149,16 @@ object LegacyHoodieParquetFileFormat { /** * Configuration key for passing table Avro schema. - * Schema can be passed through options map (preferred, thread-safe) or hadoopConf (fallback). + * Schema can be passed through options map (buildReader) or hadoopConf (supportBatch + fallback). * This preserves the correct logical types (e.g., timestampMillis vs timestampMicros). */ val HOODIE_TABLE_AVRO_SCHEMA = "hoodie.table.avro.schema" + + /** + * Helper method to set table Avro schema in hadoopConf. + * Required for supportBatch() which only sees hadoopConf, not the relation's options. + */ + def setTableAvroSchemaInConf(hadoopConf: Configuration, avroSchema: Schema): Unit = { + hadoopConf.set(HOODIE_TABLE_AVRO_SCHEMA, avroSchema.toString) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 978248ed0cb5f..a71067ec344a2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -792,46 +792,48 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS @Test def testGetTimestampMillisColumns(): Unit = { - // RECORD with timestamp-millis and local-timestamp-millis -> both returned - val recordWithTimestampMillisSchema = new Schema.Parser().parse( - """ - |{ - | "type": "record", - | "name": "TestRecord", - | "fields": [ - | {"name": "ts_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}}, - | {"name": "ts_local_millis", "type": {"type": "long", "logicalType": "local-timestamp-millis"}}, - | {"name": "plain_long", "type": "long"}, - | {"name": "name", "type": "string"} - | ] - |} - |""".stripMargin) - val resultWithTimestampMillis = HoodieFileIndex.getTimestampMillisColumns(recordWithTimestampMillisSchema) - assertEquals(2, resultWithTimestampMillis.size) - assertTrue(resultWithTimestampMillis.contains("ts_millis")) - assertTrue(resultWithTimestampMillis.contains("ts_local_millis")) - assertFalse(resultWithTimestampMillis.contains("plain_long")) - assertFalse(resultWithTimestampMillis.contains("name")) - - // RECORD with only plain long and string -> empty set - val recordWithoutTimestampMillisSchema = new Schema.Parser().parse( - """ - |{ - | "type": "record", - | "name": "PlainRecord", - | "fields": [ - | {"name": "id", "type": "long"}, - | {"name": "name", "type": "string"} - | ] - |} - |""".stripMargin) - val resultPlain = HoodieFileIndex.getTimestampMillisColumns(recordWithoutTimestampMillisSchema) - assertTrue(resultPlain.isEmpty) - - // Non-RECORD schema -> empty set - val stringSchema = Schema.create(Schema.Type.STRING) - val resultNonRecord = HoodieFileIndex.getTimestampMillisColumns(stringSchema) - assertTrue(resultNonRecord.isEmpty) + if (HoodieSparkUtils.gteqSpark3_4) { + // RECORD with timestamp-millis and local-timestamp-millis -> both returned + val recordWithTimestampMillisSchema = new Schema.Parser().parse( + """ + |{ + | "type": "record", + | "name": "TestRecord", + | "fields": [ + | {"name": "ts_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}}, + | {"name": "ts_local_millis", "type": {"type": "long", "logicalType": "local-timestamp-millis"}}, + | {"name": "plain_long", "type": "long"}, + | {"name": "name", "type": "string"} + | ] + |} + |""".stripMargin) + val resultWithTimestampMillis = HoodieFileIndex.getTimestampMillisColumns(recordWithTimestampMillisSchema) + assertEquals(2, resultWithTimestampMillis.size) + assertTrue(resultWithTimestampMillis.contains("ts_millis")) + assertTrue(resultWithTimestampMillis.contains("ts_local_millis")) + assertFalse(resultWithTimestampMillis.contains("plain_long")) + assertFalse(resultWithTimestampMillis.contains("name")) + + // RECORD with only plain long and string -> empty set + val recordWithoutTimestampMillisSchema = new Schema.Parser().parse( + """ + |{ + | "type": "record", + | "name": "PlainRecord", + | "fields": [ + | {"name": "id", "type": "long"}, + | {"name": "name", "type": "string"} + | ] + |} + |""".stripMargin) + val resultPlain = HoodieFileIndex.getTimestampMillisColumns(recordWithoutTimestampMillisSchema) + assertTrue(resultPlain.isEmpty) + + // Non-RECORD schema -> empty set + val stringSchema = Schema.create(Schema.Type.STRING) + val resultNonRecord = HoodieFileIndex.getTimestampMillisColumns(stringSchema) + assertTrue(resultNonRecord.isEmpty) + } } private def attribute(partition: String): AttributeReference = { diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala index fd8e52ea1e92c..6beb33e57fc65 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala @@ -31,7 +31,6 @@ import org.apache.hudi.common.util.collection.Pair import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} -import org.apache.hudi.io.storage.HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR import org.apache.hudi.SparkAdapterSupport.sparkAdapter import org.apache.hudi.common.table.ParquetTableSchemaResolver import org.apache.parquet.filter2.compat.FilterCompat @@ -151,8 +150,6 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu pruneInternalSchema(internalSchemaStr, requiredSchema) hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) } - hadoopConf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, hasTimestampMillisFieldInTableSchema.toString) - val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) From 435bac295a34a7ac28b60925d095fef986f2278c Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Mon, 2 Feb 2026 16:48:02 +0530 Subject: [PATCH 07/27] Fix hive related tests --- .../hudi/hive/testutils/HiveTestService.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java index e22fefa436803..4f7ce62baa922 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java @@ -359,25 +359,25 @@ private HiveMetaStore.HMSHandler createHMSHandler(HiveConf conf) throws IOExcept throw new IOException("Failed to create HMSHandler using (String, Configuration, boolean) constructor", e); } - // Try Hive 3.x constructor with HiveConf (2 parameters: String, HiveConf) + // Try Hive 2.x constructor (3 parameters: String, HiveConf, boolean) try { - Constructor constructor = hmsHandlerClass.getConstructor(String.class, HiveConf.class); - return (HiveMetaStore.HMSHandler) constructor.newInstance(handlerName, conf); + Constructor constructor = hmsHandlerClass.getConstructor(String.class, HiveConf.class, boolean.class); + return (HiveMetaStore.HMSHandler) constructor.newInstance(handlerName, conf, false); } catch (NoSuchMethodException e) { // Continue to next option } catch (Exception e) { - throw new IOException("Failed to create HMSHandler using (String, HiveConf) constructor", e); + throw new IOException("Failed to create HMSHandler using (String, HiveConf, boolean) constructor", e); } - // Try Hive 2.x constructor (3 parameters: String, HiveConf, boolean) + // Try Hive 2.x constructor with HiveConf (2 parameters: String, HiveConf) try { - Constructor constructor = hmsHandlerClass.getConstructor(String.class, HiveConf.class, boolean.class); - return (HiveMetaStore.HMSHandler) constructor.newInstance(handlerName, conf, false); + Constructor constructor = hmsHandlerClass.getConstructor(String.class, HiveConf.class); + return (HiveMetaStore.HMSHandler) constructor.newInstance(handlerName, conf); } catch (NoSuchMethodException e) { throw new IOException("Failed to create HMSHandler. No compatible constructor found. " + "Available constructors: " + java.util.Arrays.toString(hmsHandlerClass.getConstructors()), e); } catch (Exception e) { - throw new IOException("Failed to create HMSHandler using (String, HiveConf, boolean) constructor", e); + throw new IOException("Failed to create HMSHandler using (String, HiveConf) constructor", e); } } From 7be4046be08e6822c021de1d0003550347b5088b Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Tue, 3 Feb 2026 17:47:52 +0530 Subject: [PATCH 08/27] Fix tests --- .../ChainedSchemaPostProcessor.java | 2 +- .../TestHoodieDeltaStreamer.java | 29 ++++++++++--------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java index b5cb01d72a69a..0295e80bed8be 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java @@ -48,6 +48,6 @@ public Schema processSchema(Schema schema) { for (SchemaPostProcessor processor : processors) { targetSchema = processor.processSchema(targetSchema); } - return schema; + return targetSchema; } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 7b21ef0880239..56cf5fa128e3a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -838,7 +838,7 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati sparkSession.conf().set("spark.sql.session.timeZone", "UTC"); sparkSession.conf().set("spark.sql.parquet.enableVectorizedReader", "false"); Dataset df = sparkSession.read().format("hudi").load(tableBasePath); - assertDataframe(df, 16, 16); + assertDataframe(df, 15, 15, true); if ("CLUSTER".equals(operation)) { // after we cluster, the raw parquet should be correct @@ -859,7 +859,7 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati // Read raw parquet files Dataset rawParquetDf = sparkSession.read().parquet(baseFilePaths.toArray(new String[0])); - assertDataframe(rawParquetDf, 15, 15); + assertDataframe(rawParquetDf, 15, 15, false); } } finally { sparkSession.conf().set("spark.sql.session.timeZone", prevTimezone); @@ -958,7 +958,7 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati sparkSession.conf().set("spark.sql.session.timeZone", "UTC"); Dataset df = sparkSession.read().format("hudi").load(tableBasePath); - assertDataframe(df, 12, 14); + assertDataframe(df, 12, 14, false); metaClient = HoodieTableMetaClient.builder() .setConf(hadoopConf) @@ -978,7 +978,7 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati // Read raw parquet files Dataset rawParquetDf = sparkSession.read().parquet(baseFilePaths.toArray(new String[0])); - assertDataframe(rawParquetDf, 12, 14); + assertDataframe(rawParquetDf, 12, 14, false); } else if ("COMPACT".equals(operation)) { // after compaction some files should be ok // Validate raw parquet files @@ -994,7 +994,7 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati // only read the compacted ones, the others are still incorrect .filter(path -> path.contains(latestInstant.get().getTimestamp())) .toArray(String[]::new)); - assertDataframe(rawParquetDf, 8, 8); + assertDataframe(rawParquetDf, 8, 8, false); } } finally { sparkSession.conf().set("spark.sql.session.timeZone", prevTimezone); @@ -1006,23 +1006,24 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati }); } - public static void assertDataframe(Dataset df, int above, int below) { + public static void assertDataframe(Dataset df, int above, int below, boolean isCOWSnapshotQuery) { List rows = df.collectAsList(); assertEquals(above + below, rows.size()); + int indexAdjustment = isCOWSnapshotQuery ? 0 : 1; for (Row row : rows) { String val = row.getString(6); int hash = val.hashCode(); if ((hash & 1) == 0) { - assertEquals("2020-01-01T00:00:00.001Z", row.getTimestamp(15).toInstant().toString()); - assertEquals("2020-06-01T12:00:00.000001Z", row.getTimestamp(16).toInstant().toString()); - assertEquals("2015-05-20T12:34:56.001", row.get(17).toString()); - assertEquals("2017-07-07T07:07:07.000001", row.get(18).toString()); + assertEquals("2020-01-01T00:00:00.001Z", row.getTimestamp(14 + indexAdjustment).toInstant().toString()); + assertEquals("2020-06-01T12:00:00.000001Z", row.getTimestamp(15 + indexAdjustment).toInstant().toString()); + assertEquals("2015-05-20T12:34:56.001", row.get(16 + indexAdjustment).toString()); + assertEquals("2017-07-07T07:07:07.000001", row.get(17 + indexAdjustment).toString()); } else { - assertEquals("2019-12-31T23:59:59.999Z", row.getTimestamp(15).toInstant().toString()); - assertEquals("2020-06-01T11:59:59.999999Z", row.getTimestamp(16).toInstant().toString()); - assertEquals("2015-05-20T12:34:55.999", row.get(17).toString()); - assertEquals("2017-07-07T07:07:06.999999", row.get(18).toString()); + assertEquals("2019-12-31T23:59:59.999Z", row.getTimestamp(14 + indexAdjustment).toInstant().toString()); + assertEquals("2020-06-01T11:59:59.999999Z", row.getTimestamp(15 + indexAdjustment).toInstant().toString()); + assertEquals("2015-05-20T12:34:55.999", row.get(16 + indexAdjustment).toString()); + assertEquals("2017-07-07T07:07:06.999999", row.get(17 + indexAdjustment).toString()); } } From 94526f4b46520fe359fa41a7143f7127019c8ef4 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Wed, 4 Feb 2026 08:15:05 +0530 Subject: [PATCH 09/27] Fix tests --- .../functional/TestExternalPathHandling.java | 3 +++ .../common/util/ExternalFilePathUtil.java | 21 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java index 0785f9eea76d9..c92b81b8680eb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java @@ -77,6 +77,8 @@ public class TestExternalPathHandling extends HoodieClientTestBase { private static final String FIELD_1 = "field1"; private static final String FIELD_2 = "field2"; + private static final String TEST_SCHEMA = "{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.apache.hudi.test\"," + + "\"fields\":[{\"name\":\"field1\",\"type\":\"int\"},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null}]}"; private HoodieWriteConfig writeConfig; @ParameterizedTest @@ -89,6 +91,7 @@ public void testFlow(FileIdAndNameGenerator fileIdAndNameGenerator, List .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build()) .withPath(metaClient.getBasePathV2().toString()) .withEmbeddedTimelineServerEnabled(false) + .withSchema(TEST_SCHEMA) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(2) .enable(true) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java index 223ae8abc42b9..50032af9feffb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java @@ -43,4 +43,25 @@ public static String appendCommitTimeAndExternalFileMarker(String filePath, Stri public static boolean isExternallyCreatedFile(String fileName) { return fileName.endsWith(EXTERNAL_FILE_SUFFIX); } + + /** + * Removes the appended commit time and external file marker from the file path, returning the original file path. + * + * @param filePath The file path with commit time and external file marker appended + */ + public static String removeCommitTimeAndExternalFileMarker(String filePath) { + if (!isExternallyCreatedFile(filePath)) { + return filePath; + } + // Remove the suffix + String pathWithoutSuffix = filePath.substring(0, filePath.length() - EXTERNAL_FILE_SUFFIX.length()); + // Find the last underscore which separates the commit time + int lastUnderscoreIndex = pathWithoutSuffix.lastIndexOf('_'); + if (lastUnderscoreIndex == -1) { + // No underscore found, return as is + return filePath; + } + // Return the path without the commit time and suffix + return pathWithoutSuffix.substring(0, lastUnderscoreIndex); + } } From d3b78978af6a019a806572d5a0358825780702ba Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Wed, 4 Feb 2026 19:32:01 +0530 Subject: [PATCH 10/27] Fix tests --- hudi-utilities/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index cb28e3a3ec264..b9e40ef001a2e 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -439,6 +439,13 @@ + + org.apache.hive + hive-storage-api + ${hive.storage.version} + test + + ${hive.groupid} hive-jdbc From ac14313ca63c54456e3a49c7f99a77df7a446a3a Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 5 Feb 2026 00:20:57 +0530 Subject: [PATCH 11/27] Fix test and address review comments --- .../apache/hudi/io/storage/HoodieSparkParquetReader.java | 6 +++--- .../common/table/log/AbstractHoodieLogRecordReader.java | 4 +++- .../java/org/apache/hudi/io/storage/HoodieFileReader.java | 2 ++ .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 4 ++-- .../main/scala/org/apache/hudi/IncrementalRelation.scala | 4 ++-- pom.xml | 2 ++ 6 files changed, 14 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index c312327397684..c836fff4138fe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -64,7 +64,6 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader { private final Configuration conf; private final BaseFileUtils parquetUtils; private List readerIterators = new ArrayList<>(); - public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable"; private Option fileSchemaOption = Option.empty(); private Option structTypeOption = Option.empty(); private Option schemaOption = Option.empty(); @@ -127,8 +126,9 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema requestedSchema = readerSchema; } // Set configuration for timestamp_millis type repair. - conf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); - + if (!storage.getConf().contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) { + storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); + } MessageType fileSchema = getFileSchema(); Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema); Option messageSchema = Option.of(getAvroSchemaConverter(conf).convert(nonNullSchema)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 7222f334e55b8..ecca93fa487b4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; @@ -187,7 +188,8 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); if (keyFieldOverride.isPresent()) { // NOTE: This branch specifically is leveraged handling Metadata Table diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java index 00fff9a220c64..b6a2b8d60a074 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java @@ -41,6 +41,8 @@ */ public interface HoodieFileReader extends AutoCloseable { + String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable"; + String[] readMinMaxRecordKeys(); BloomFilter readBloomFilter(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 32f6c96851f6a..f09d3ae3c9602 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -47,7 +47,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.io.storage.HoodieAvroHFileReader -import org.apache.hudi.io.storage.HoodieSparkParquetReader +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging @@ -111,7 +111,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val conf: Configuration = { val c = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - c.set(HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + c.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString) c } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 1a2dc403d2aac..3790839e88755 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -36,7 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.{HoodieException, HoodieIncrementalPathNotFoundException} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.utils.SerDeHelper -import org.apache.hudi.io.storage.HoodieSparkParquetReader +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.hudi.table.HoodieSparkTable import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD @@ -210,7 +210,7 @@ class IncrementalRelation(val sqlContext: SQLContext, LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf( sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema) sqlContext.sparkContext.hadoopConfiguration.set( - HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString) } val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { diff --git a/pom.xml b/pom.xml index 9f6e1eef7bb8a..c83d55f1b8dd9 100644 --- a/pom.xml +++ b/pom.xml @@ -2348,6 +2348,8 @@ hudi-spark2-common 2.0.0 + + 2.6.0 1.10.1 1.6.0 1.8.2 From bd20879e299130081dd0a41db76ab19c42737d34 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Thu, 5 Feb 2026 14:19:11 -0800 Subject: [PATCH 12/27] Cherry pick bug fixes --- .../apache/hudi/io/storage/HoodieSparkParquetReader.java | 6 +++--- .../common/table/log/AbstractHoodieLogRecordReader.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index c836fff4138fe..0747ad321fdc4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -125,9 +125,9 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema if (requestedSchema == null) { requestedSchema = readerSchema; } - // Set configuration for timestamp_millis type repair. - if (!storage.getConf().contains(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR)) { - storage.getConf().set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); + // Set configuration for timestamp_millis type repair (only when not already set). + if (conf.get(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR) == null) { + conf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema))); } MessageType fileSchema = getFileSchema(); Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index ecca93fa487b4..10fe69984abad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -188,8 +188,8 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); + this.enableLogicalTimestampFieldRepair = fs.getConf().getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, + readerSchema != null && AvroSchemaUtils.hasTimestampMillisField(readerSchema)); if (keyFieldOverride.isPresent()) { // NOTE: This branch specifically is leveraged handling Metadata Table From 94dff70dd32b658b50347b310c8bae134a80f782 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 5 Feb 2026 02:19:47 +0530 Subject: [PATCH 13/27] Fix build --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index c83d55f1b8dd9..684aa62b967e9 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,9 @@ 2.9.9 2.10.2 org.apache.hive - 2.3.1 + 2.3.4 + + 2.6.0 1.10.1 1.8.2 0.273 @@ -2348,8 +2350,6 @@ hudi-spark2-common 2.0.0 - - 2.6.0 1.10.1 1.6.0 1.8.2 From d17b0e86dfd0d79a6b8b0762841cae7884f3b1d5 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 5 Feb 2026 11:34:05 +0530 Subject: [PATCH 14/27] Fix build --- .../org/apache/spark/sql/adapter/Spark3_4Adapter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index df1273cc96a12..76ed34e25d236 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.{Spark34HoodieFileScanRDD, SparkAdapterSupport$} -import org.apache.hudi.io.storage.HoodieSparkParquetReader +import org.apache.hudi.io.storage.HoodieFileReader import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration @@ -84,7 +84,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter { override def getParquetReadSupport(conf: Configuration, messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = { val enableTimestampFieldRepair = conf.getBoolean( - HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) new HoodieParquetReadSupport( Option.empty[ZoneId], enableVectorizedReader = true, @@ -169,7 +169,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter { val nonNullRequestedSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema) val cachedRequestedSchema = HoodieInternalRowUtils.getCachedSchema(nonNullRequestedSchema) val requestedSchemaInMessageType = org.apache.hudi.common.util.Option.of(getAvroSchemaConverter(conf).convert(nonNullRequestedSchema)) - val enableTimestampFieldRepair = conf.getBoolean(HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) + val enableTimestampFieldRepair = conf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) val repairedRequestedSchema = repairSchemaIfSpecified(enableTimestampFieldRepair, fileSchema, requestedSchemaInMessageType) val repairedRequestedStructType = new ParquetToSparkSchemaConverter(conf).convert(repairedRequestedSchema) val evolution = new SparkBasicSchemaEvolution(repairedRequestedStructType, cachedRequestedSchema, SQLConf.get.sessionLocalTimeZone) From 808d4a42adf95c20e339d5cc2b990bde816c5a62 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Thu, 5 Feb 2026 23:08:35 -0800 Subject: [PATCH 15/27] Revert "Fix tests" This reverts commit d3b78978af6a019a806572d5a0358825780702ba. --- hudi-utilities/pom.xml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index b9e40ef001a2e..cb28e3a3ec264 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -439,13 +439,6 @@ - - org.apache.hive - hive-storage-api - ${hive.storage.version} - test - - ${hive.groupid} hive-jdbc From 4f3563a296ed1b6bbc79260e75acffba983dccac Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Thu, 5 Feb 2026 23:09:05 -0800 Subject: [PATCH 16/27] Revert "Fix build" This reverts commit 94dff70dd32b658b50347b310c8bae134a80f782. --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 684aa62b967e9..c83d55f1b8dd9 100644 --- a/pom.xml +++ b/pom.xml @@ -118,9 +118,7 @@ 2.9.9 2.10.2 org.apache.hive - 2.3.4 - - 2.6.0 + 2.3.1 1.10.1 1.8.2 0.273 @@ -2350,6 +2348,8 @@ hudi-spark2-common 2.0.0 + + 2.6.0 1.10.1 1.6.0 1.8.2 From 4e5ed98ec1b874f26b5f684b7f5dad290bb7b000 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Fri, 6 Feb 2026 11:56:08 -0800 Subject: [PATCH 17/27] Fix build issues - 2.11 depending on 2.12 version --- hudi-utilities/pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index cb28e3a3ec264..6375b0c985fcd 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -542,11 +542,5 @@ ${thrift.version} test - - org.apache.hudi - hudi-spark-common_2.12 - 0.14.2-SNAPSHOT - test - From cf421c8f05905adc90de730badd57881c49f291d Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Fri, 6 Feb 2026 13:16:57 -0800 Subject: [PATCH 18/27] Fix build issues - Column stream explicty collection.2 --- .../apache/hudi/common/util/ParquetUtils.java | 18 +++++++++++------- .../hudi/metadata/HoodieTableMetadataUtil.java | 7 ++++--- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index e848b166d0ecf..c15ea5fbc34ec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -312,10 +312,12 @@ public List> readRangeFromParquetMetadata( Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName); // Collect stats from all individual Parquet blocks - Map>> columnToStatsListMap = - (Map>>) metadata.getBlocks().stream().sequential() - .flatMap(blockMetaData -> - blockMetaData.getColumns().stream() + // NOTE: Explicit cast on inner stream helps Java with type inference + @SuppressWarnings("unchecked") + Stream> blockStream = metadata.getBlocks().stream().sequential() + .flatMap(blockMetaData -> { + Stream> columnStream = + (Stream>) (Stream) blockMetaData.getColumns().stream() .filter(f -> cols.contains(f.getPath().toDotString())) .map(columnChunkMetaData -> { Statistics stats = columnChunkMetaData.getStatistics(); @@ -335,9 +337,11 @@ public List> readRangeFromParquetMetadata( columnChunkMetaData.getValueCount(), columnChunkMetaData.getTotalSize(), columnChunkMetaData.getTotalUncompressedSize()); - }) - ) - .collect(groupingByCollector); + }); + return columnStream; + }); + + Map>> columnToStatsListMap = blockStream.collect(groupingByCollector); // Combine those into file-level statistics // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index c34f0309db5d7..4ad0d3126f9aa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -241,7 +241,7 @@ class ColumnStats { Collector, ?, Map>> collector = Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity()); - return (Map>) targetFields.stream() + Stream> stream = targetFields.stream() .map(field -> { ColumnStats colStats = allColumnStats.get(field.name()); return HoodieColumnRangeMetadata.create( @@ -257,8 +257,9 @@ class ColumnStats { 0, 0 ); - }) - .collect(collector); + }); + + return stream.collect(collector); } /** From 2f0ae3bc61268985bc55f9bbddd55f095fd1bfa0 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Fri, 6 Feb 2026 13:54:08 -0800 Subject: [PATCH 19/27] Fix build issues - Use projection schema instead of repairFileSchema --- .../org/apache/hudi/io/storage/HoodieAvroParquetReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java index 275ca3ea738de..6619c7f771b89 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java @@ -165,7 +165,7 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema Option promotedSchema = Option.empty(); if (!renamedColumns.isPresent() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema, schema)) { AvroReadSupport.setAvroReadSchema(conf, repairedFileSchema); - AvroReadSupport.setRequestedProjection(conf, repairedFileSchema); + AvroReadSupport.setRequestedProjection(conf, schema); promotedSchema = Option.of(schema); } else { AvroReadSupport.setAvroReadSchema(conf, schema); From a10b077e3bb4ffe228809da6cb0878c0e4f92237 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Fri, 6 Feb 2026 16:06:36 -0800 Subject: [PATCH 20/27] Fix build issues - remove spark3.5 profiles from github workflow bot --- .github/workflows/bot.yml | 104 -------------------------------------- 1 file changed, 104 deletions(-) diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 5c851b8041c34..13104d0a55fec 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -264,110 +264,6 @@ jobs: export PATH="$JAVA_HOME/bin:$PATH" mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS - test-spark-java11-17-java-tests: - runs-on: ubuntu-latest - strategy: - matrix: - include: - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - steps: - - uses: actions/checkout@v3 - - name: Set up JDK 11 - uses: actions/setup-java@v3 - with: - java-version: '11' - distribution: 'temurin' - architecture: x64 - cache: maven - - name: Build Project - env: - SCALA_PROFILE: ${{ matrix.scalaProfile }} - SPARK_PROFILE: ${{ matrix.sparkProfile }} - run: - mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - java-version: '17' - distribution: 'temurin' - architecture: x64 - cache: maven - - name: Quickstart Test - env: - SCALA_PROFILE: ${{ matrix.scalaProfile }} - SPARK_PROFILE: ${{ matrix.sparkProfile }} - run: - mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl hudi-examples/hudi-examples-spark $MVN_ARGS - - name: Java UT - Common & Spark - env: - SCALA_PROFILE: ${{ matrix.scalaProfile }} - SPARK_PROFILE: ${{ matrix.sparkProfile }} - SPARK_MODULES: ${{ matrix.sparkModules }} - run: - mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS - - name: Java FT - Spark - env: - SCALA_PROFILE: ${{ matrix.scalaProfile }} - SPARK_PROFILE: ${{ matrix.sparkProfile }} - SPARK_MODULES: ${{ matrix.sparkModules }} - run: - mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS - - test-spark-java11-17-scala-tests: - runs-on: ubuntu-latest - strategy: - matrix: - include: - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - steps: - - uses: actions/checkout@v3 - - name: Set up JDK 11 - uses: actions/setup-java@v3 - with: - java-version: '11' - distribution: 'temurin' - architecture: x64 - cache: maven - - name: Build Project - env: - SCALA_PROFILE: ${{ matrix.scalaProfile }} - SPARK_PROFILE: ${{ matrix.sparkProfile }} - run: - mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - java-version: '17' - distribution: 'temurin' - architecture: x64 - cache: maven - - name: Scala UT - Common & Spark - env: - SCALA_PROFILE: ${{ matrix.scalaProfile }} - SPARK_PROFILE: ${{ matrix.sparkProfile }} - SPARK_MODULES: ${{ matrix.sparkModules }} - run: - mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS - - name: Scala FT - Spark - env: - SCALA_PROFILE: ${{ matrix.scalaProfile }} - SPARK_PROFILE: ${{ matrix.sparkProfile }} - SPARK_MODULES: ${{ matrix.sparkModules }} - run: - mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS - test-flink: runs-on: ubuntu-latest strategy: From ac619b60537721e3f95c498481b40d0e6f7c83ba Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Fri, 6 Feb 2026 16:33:52 -0800 Subject: [PATCH 21/27] Fix build issues - Fix IT test pom.xml update --- hudi-cli/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index fcc9a63a87795..de463b42d8315 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -143,6 +143,13 @@ + + + javax.validation + validation-api + 2.0.1.Final + + org.scala-lang From 0f39b5357ca7e1260cbc5faf2a0b209d480e67a3 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Sat, 7 Feb 2026 23:51:42 -0800 Subject: [PATCH 22/27] Fix build issues - Use projection schema fields from repaired schema --- .../io/storage/HoodieAvroParquetReader.java | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java index 6619c7f771b89..ade684aa2d04c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java @@ -44,7 +44,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.hudi.avro.AvroSchemaUtils.getRepairedSchema; import static org.apache.hudi.common.util.TypeUtils.unsafeCast; @@ -165,7 +167,8 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema Option promotedSchema = Option.empty(); if (!renamedColumns.isPresent() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema, schema)) { AvroReadSupport.setAvroReadSchema(conf, repairedFileSchema); - AvroReadSupport.setRequestedProjection(conf, schema); + Schema projectionSchema = computeSafeProjection(repairedFileSchema, schema); + AvroReadSupport.setRequestedProjection(conf, projectionSchema); promotedSchema = Option.of(schema); } else { AvroReadSupport.setAvroReadSchema(conf, schema); @@ -185,6 +188,41 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema return parquetReaderIterator; } + /** + * Computes a safe projection schema by intersecting the requested schema with the file schema. + * This ensures we only request fields that actually exist in the file, enabling column pruning + * while avoiding "field not found" errors. + * + * @param fileSchema The schema from the file (with repaired types) + * @param requestedSchema The schema we'd like to read + * @return A projection schema containing only fields that exist in both schemas + */ + private Schema computeSafeProjection(Schema fileSchema, Schema requestedSchema) { + Map fileFields = fileSchema.getFields().stream() + .collect(Collectors.toMap(Schema.Field::name, f -> f)); + + List projectedFields = requestedSchema.getFields().stream() + .filter(field -> fileFields.containsKey(field.name())) + .map(field -> { + Schema.Field fileField = fileFields.get(field.name()); + return new Schema.Field(fileField.name(), fileField.schema(), fileField.doc(), fileField.defaultVal()); + }) + .collect(Collectors.toList()); + + if (projectedFields.isEmpty()) { + return fileSchema; + } + + Schema projectedSchema = Schema.createRecord( + fileSchema.getName(), + fileSchema.getDoc(), + fileSchema.getNamespace(), + fileSchema.isError() + ); + projectedSchema.setFields(projectedFields); + return projectedSchema; + } + @Override public ClosableIterator getRecordKeyIterator() throws IOException { ClosableIterator recordKeyIterator = getIndexedRecordIterator(HoodieAvroUtils.getRecordKeySchema()); From e2af120956fb636f6d9ca2e1921c71046bdc6f7d Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Sun, 8 Feb 2026 01:09:02 -0800 Subject: [PATCH 23/27] Fix build issues - throw exception only for spark 3.4 gt --- .../scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala index d0cff9650eedb..1e7ea964017d9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala @@ -854,7 +854,7 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss upsertData(df2, tempRecordPath, tableType.equals("COPY_ON_WRITE")) // after implicit type change, read the table with vectorized read enabled - if (HoodieSparkUtils.gteqSpark3_3) { + if (HoodieSparkUtils.gteqSpark3_4) { assertThrows(classOf[SparkException]) { withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { readTable(tempRecordPath) From 7508f661a52015bf30167d69bac0ba83a2364ff2 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Sun, 8 Feb 2026 02:56:41 -0800 Subject: [PATCH 24/27] Fix build issues - throw exception only for spark 3.3 --- .../org/apache/hudi/TestAvroSchemaResolutionSupport.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala index 1e7ea964017d9..4b90eef3f1bc3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala @@ -854,7 +854,9 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss upsertData(df2, tempRecordPath, tableType.equals("COPY_ON_WRITE")) // after implicit type change, read the table with vectorized read enabled - if (HoodieSparkUtils.gteqSpark3_4) { + // Spark 3.3 has a ClassCastException bug with vectorized nested column reader after type changes + // Spark 3.4+ with computeSafeProjection fix handles type promotions correctly + if (HoodieSparkUtils.gteqSpark3_3 && !HoodieSparkUtils.gteqSpark3_4) { assertThrows(classOf[SparkException]) { withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { readTable(tempRecordPath) From 6a7a5fbc159ad90a225b05df33501c50ddfd487d Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Sun, 8 Feb 2026 16:16:05 -0800 Subject: [PATCH 25/27] Fix build issues - remove exceptions --- .../org/apache/hudi/TestAvroSchemaResolutionSupport.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala index 4b90eef3f1bc3..18205971bd8ae 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala @@ -854,9 +854,8 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss upsertData(df2, tempRecordPath, tableType.equals("COPY_ON_WRITE")) // after implicit type change, read the table with vectorized read enabled - // Spark 3.3 has a ClassCastException bug with vectorized nested column reader after type changes - // Spark 3.4+ with computeSafeProjection fix handles type promotions correctly - if (HoodieSparkUtils.gteqSpark3_3 && !HoodieSparkUtils.gteqSpark3_4) { + val isCow = tableType.equals("COPY_ON_WRITE") + if (HoodieSparkUtils.gteqSpark3_3 && !HoodieSparkUtils.gteqSpark3_4 && isCow) { assertThrows(classOf[SparkException]) { withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { readTable(tempRecordPath) From 6bea797695bbd068d955f9030cd3569843147212 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Mon, 9 Feb 2026 11:34:40 -0800 Subject: [PATCH 26/27] Fix build issues - remove exceptions for spark33 --- .../hudi/TestAvroSchemaResolutionSupport.scala | 13 ++----------- .../Spark33LegacyHoodieParquetFileFormat.scala | 3 +-- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala index 18205971bd8ae..9f9afd1c337bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala @@ -854,17 +854,8 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss upsertData(df2, tempRecordPath, tableType.equals("COPY_ON_WRITE")) // after implicit type change, read the table with vectorized read enabled - val isCow = tableType.equals("COPY_ON_WRITE") - if (HoodieSparkUtils.gteqSpark3_3 && !HoodieSparkUtils.gteqSpark3_4 && isCow) { - assertThrows(classOf[SparkException]) { - withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { - readTable(tempRecordPath) - } - } - } else { - withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { - readTable(tempRecordPath) - } + withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { + readTable(tempRecordPath) } withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") { diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala index add53e12e2493..5c3b8cd297561 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala @@ -121,8 +121,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val sqlConf = sparkSession.sessionState.conf val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled val enableVectorizedReader: Boolean = - sqlConf.parquetVectorizedReaderEnabled && - resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion val capacity = sqlConf.parquetVectorizedReaderBatchSize From 281228a612219482e0770ecd63224ac876a5b129 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Mon, 9 Feb 2026 13:41:09 -0800 Subject: [PATCH 27/27] Fix build issues - remove exceptions for spark33 --- .../apache/hudi/TestAvroSchemaResolutionSupport.scala | 2 ++ .../Spark33LegacyHoodieParquetFileFormat.scala | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala index 9f9afd1c337bb..e597d552337df 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala @@ -854,6 +854,8 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss upsertData(df2, tempRecordPath, tableType.equals("COPY_ON_WRITE")) // after implicit type change, read the table with vectorized read enabled + // The supportBatch override in Spark33/34LegacyHoodieParquetFileFormat ensures that + // nested types use row-based reading instead of columnar batches, preventing ClassCastException withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { readTable(tempRecordPath) } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala index 5c3b8cd297561..e42536735ce93 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala @@ -64,6 +64,13 @@ import java.net.URI */ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + val conf = sparkSession.sessionState.conf + conf.parquetVectorizedReaderEnabled && + schema.forall(_.dataType.isInstanceOf[AtomicType]) && + ParquetUtils.isBatchReadSupportedForSchema(conf, schema) + } + override def buildReaderWithPartitionValues(sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, @@ -121,7 +128,9 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val sqlConf = sparkSession.sessionState.conf val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled val enableVectorizedReader: Boolean = - ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) + sqlConf.parquetVectorizedReaderEnabled && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) && + ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion val capacity = sqlConf.parquetVectorizedReaderBatchSize