diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 5c851b8041c34..13104d0a55fec 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -264,110 +264,6 @@ jobs:
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- test-spark-java11-17-java-tests:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 11
- uses: actions/setup-java@v3
- with:
- java-version: '11'
- distribution: 'temurin'
- architecture: x64
- cache: maven
- - name: Build Project
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- - name: Set up JDK 17
- uses: actions/setup-java@v3
- with:
- java-version: '17'
- distribution: 'temurin'
- architecture: x64
- cache: maven
- - name: Quickstart Test
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- run:
- mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl hudi-examples/hudi-examples-spark $MVN_ARGS
- - name: Java UT - Common & Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- - name: Java FT - Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
-
- test-spark-java11-17-scala-tests:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 11
- uses: actions/setup-java@v3
- with:
- java-version: '11'
- distribution: 'temurin'
- architecture: x64
- cache: maven
- - name: Build Project
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- - name: Set up JDK 17
- uses: actions/setup-java@v3
- with:
- java-version: '17'
- distribution: 'temurin'
- architecture: x64
- cache: maven
- - name: Scala UT - Common & Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- - name: Scala FT - Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
-
test-flink:
runs-on: ubuntu-latest
strategy:
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index fcc9a63a87795..de463b42d8315 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -143,6 +143,13 @@
+
+
+ javax.validation
+ validation-api
+ 2.0.1.Final
+
+
org.scala-lang
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 45b0093b8ffd7..0747ad321fdc4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -64,7 +64,6 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
private final Configuration conf;
private final BaseFileUtils parquetUtils;
private List readerIterators = new ArrayList<>();
- public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable";
private Option fileSchemaOption = Option.empty();
private Option structTypeOption = Option.empty();
private Option schemaOption = Option.empty();
@@ -73,7 +72,7 @@ public HoodieSparkParquetReader(Configuration conf, Path path) {
this.path = path;
this.conf = new Configuration(conf);
// Avoid adding record in list element when convert parquet schema to avro schema
- conf.set(ADD_LIST_ELEMENT_RECORDS, "false");
+ this.conf.set(ADD_LIST_ELEMENT_RECORDS, "false");
this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
}
@@ -126,6 +125,10 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
+ // Set configuration for timestamp_millis type repair (only when not already set).
+ if (conf.get(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR) == null) {
+ conf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema)));
+ }
MessageType fileSchema = getFileSchema();
Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
Option messageSchema = Option.of(getAvroSchemaConverter(conf).convert(nonNullSchema));
@@ -136,8 +139,9 @@ private ClosableIterator getInternalRowIterator(Schema readerSchema
conf.set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
conf.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
ParquetReader reader = ParquetReader.builder(
- (ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema),
- new Path(path.toUri())).withConf(conf)
+ (ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(conf, messageSchema),
+ path)
+ .withConf(conf)
.build();
ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index e8058aa1f2248..eb0e3b4901c67 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -59,7 +59,9 @@ trait SparkAdapter extends Serializable {
def isTimestampNTZType(dataType: DataType): Boolean
- def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): org.apache.parquet.hadoop.api.ReadSupport[_]
+ def getParquetReadSupport(conf: Configuration,
+ messageSchema: org.apache.hudi.common.util.Option[MessageType]):
+ org.apache.parquet.hadoop.api.ReadSupport[_]
def repairSchemaIfSpecified(shouldRepair: Boolean,
fileSchema: MessageType,
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
index 0785f9eea76d9..c92b81b8680eb 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -77,6 +77,8 @@ public class TestExternalPathHandling extends HoodieClientTestBase {
private static final String FIELD_1 = "field1";
private static final String FIELD_2 = "field2";
+ private static final String TEST_SCHEMA = "{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.apache.hudi.test\","
+ + "\"fields\":[{\"name\":\"field1\",\"type\":\"int\"},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
private HoodieWriteConfig writeConfig;
@ParameterizedTest
@@ -89,6 +91,7 @@ public void testFlow(FileIdAndNameGenerator fileIdAndNameGenerator, List
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
.withPath(metaClient.getBasePathV2().toString())
.withEmbeddedTimelineServerEnabled(false)
+ .withSchema(TEST_SCHEMA)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(2)
.enable(true)
diff --git a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java
index 75f880ae3f955..78908a40ae52e 100644
--- a/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java
+++ b/hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java
@@ -32,7 +32,6 @@
import java.util.List;
public class AvroSchemaRepair {
- public static boolean isLocalTimestampSupported = isLocalTimestampMillisSupported();
public static Schema repairLogicalTypes(Schema fileSchema, Schema tableSchema) {
Schema repairedSchema = repairAvroSchema(fileSchema, tableSchema);
@@ -242,18 +241,4 @@ public static boolean hasTimestampMillisField(Schema tableSchema) {
&& (tableSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis || tableSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis);
}
}
-
- /**
- * Check if LogicalTypes.LocalTimestampMillis is supported in the current Avro version
- *
- * @return true if LocalTimestampMillis is available, false otherwise
- */
- public static boolean isLocalTimestampMillisSupported() {
- try {
- return Arrays.stream(LogicalTypes.class.getDeclaredClasses())
- .anyMatch(c -> c.getSimpleName().equals("LocalTimestampMillis"));
- } catch (Exception e) {
- return false;
- }
- }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 6808ae1528279..824a94abab4bd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -479,11 +479,6 @@ protected boolean shouldReadAsPartitionedTable() {
return (partitionColumns.length > 0 && canParsePartitionValues()) || HoodieTableMetadata.isMetadataTable(basePath.toString());
}
- protected PartitionPath convertToPartitionPath(String partitionPath) {
- Object[] partitionColumnValues = parsePartitionColumnValues(partitionColumns, partitionPath);
- return new PartitionPath(partitionPath, partitionColumnValues);
- }
-
private static long fileSliceSize(FileSlice fileSlice) {
long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
.filter(s -> s > 0)
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java
index b679ea3c8d508..cecc420813c3b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java
@@ -31,7 +31,6 @@
*/
public class AvroSchemaCache {
-
// Ensure that there is only one variable instance of the same schema within an entire JVM lifetime
private static final LoadingCache SCHEMA_CACHE = Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k);
@@ -43,5 +42,4 @@ public class AvroSchemaCache {
public static Schema intern(Schema schema) {
return SCHEMA_CACHE.get(schema);
}
-
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index c01af1b0644a2..7f4cf8c42502b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -43,6 +43,17 @@
*/
public class AvroSchemaUtils {
+ private static final Class> AVRO_SCHEMA_REPAIR_CLASS;
+ static {
+ Class> clazz = null;
+ try {
+ clazz = Class.forName("org.apache.parquet.schema.AvroSchemaRepair");
+ } catch (ClassNotFoundException e) {
+ // AvroSchemaRepair not on classpath (e.g. when parquet schema not available)
+ }
+ AVRO_SCHEMA_REPAIR_CLASS = clazz;
+ }
+
private AvroSchemaUtils() {}
/**
@@ -238,10 +249,6 @@ public static Option findNestedFieldSchema(Schema schema, String fieldNa
return Option.of(getNonNullTypeFromUnion(schema));
}
- public static Option findNestedFieldType(Schema schema, String fieldName) {
- return findNestedFieldSchema(schema, fieldName).map(Schema::getType);
- }
-
/**
* Appends provided new fields at the end of the given schema
*
@@ -399,13 +406,32 @@ public static void checkSchemaCompatible(
}
public static Schema getRepairedSchema(Schema writerSchema, Schema readerSchema) {
+ if (AVRO_SCHEMA_REPAIR_CLASS == null) {
+ return writerSchema;
+ }
try {
- Class> avroSchemaRepairClass = Class.forName("org.apache.parquet.schema.AvroSchemaRepair");
- Method repairMethod = avroSchemaRepairClass.getMethod("repairLogicalTypes", Schema.class, Schema.class);
+ Method repairMethod =
+ AVRO_SCHEMA_REPAIR_CLASS.getMethod("repairLogicalTypes", Schema.class, Schema.class);
return (Schema) repairMethod.invoke(null, writerSchema, readerSchema);
- } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
- // Fallback if class/method not available
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
return writerSchema;
}
}
+
+ /**
+ * Returns true if the given Avro schema contains any timestamp-millis logical type.
+ * Used to decide whether to enable logical timestamp field repair when reading log blocks.
+ */
+ public static boolean hasTimestampMillisField(Schema schema) {
+ if (schema == null || AVRO_SCHEMA_REPAIR_CLASS == null) {
+ return false;
+ }
+ try {
+ Method hasTimestampMillisFieldMethod =
+ AVRO_SCHEMA_REPAIR_CLASS.getMethod("hasTimestampMillisField", Schema.class);
+ return (Boolean) hasTimestampMillisFieldMethod.invoke(null, schema);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ return false;
+ }
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 3e51d826a9ca6..7221ed9f00b2c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -161,6 +161,19 @@ public class HoodieAvroUtils {
private static final Pattern INVALID_AVRO_FIRST_CHAR_IN_NAMES_PATTERN = Pattern.compile("[^A-Za-z_]");
private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
+ // Cached Avro logical type classes (null if not available, e.g., Avro 1.8.2).
+ // Loaded once at class init to avoid repeated Class.forName() and reflection cost per record.
+ private static final Class> LOCAL_TIMESTAMP_MILLIS_CLASS = loadClassSafe("org.apache.avro.LogicalTypes$LocalTimestampMillis");
+ private static final Class> LOCAL_TIMESTAMP_MICROS_CLASS = loadClassSafe("org.apache.avro.LogicalTypes$LocalTimestampMicros");
+
+ private static Class> loadClassSafe(String name) {
+ try {
+ return Class.forName(name);
+ } catch (ClassNotFoundException e) {
+ return null;
+ }
+ }
+
// All metadata fields are optional strings.
public static final Schema METADATA_FIELD_SCHEMA = createNullableSchema(Schema.Type.STRING);
@@ -1406,34 +1419,22 @@ public static Comparable> unwrapAvroValueWrapper(Object avroValueWrapper) {
* Checks if a logical type is an instance of LocalTimestampMillis using reflection.
* Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
*/
- private static boolean isLocalTimestampMillis(LogicalType logicalType) {
- if (logicalType == null) {
- return false;
- }
- try {
- Class> localTimestampMillisClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMillis");
- return localTimestampMillisClass.isInstance(logicalType);
- } catch (ClassNotFoundException e) {
- // Class doesn't exist (e.g., Avro 1.8.2)
+ public static boolean isLocalTimestampMillis(LogicalType logicalType) {
+ if (logicalType == null || LOCAL_TIMESTAMP_MILLIS_CLASS == null) {
return false;
}
+ return LOCAL_TIMESTAMP_MILLIS_CLASS.isInstance(logicalType);
}
/**
* Checks if a logical type is an instance of LocalTimestampMicros using reflection.
* Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
*/
- private static boolean isLocalTimestampMicros(LogicalType logicalType) {
- if (logicalType == null) {
- return false;
- }
- try {
- Class> localTimestampMicrosClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMicros");
- return localTimestampMicrosClass.isInstance(logicalType);
- } catch (ClassNotFoundException e) {
- // Class doesn't exist (e.g., Avro 1.8.2)
+ public static boolean isLocalTimestampMicros(LogicalType logicalType) {
+ if (logicalType == null || LOCAL_TIMESTAMP_MICROS_CLASS == null) {
return false;
}
+ return LOCAL_TIMESTAMP_MICROS_CLASS.isInstance(logicalType);
}
private static Object convertDefaultValueForAvroCompatibility(Object defaultValue) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 3678efe786252..10fe69984abad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -40,10 +41,10 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
@@ -111,6 +112,8 @@ public abstract class AbstractHoodieLogRecordReader {
private final TypedProperties payloadProps;
// Log File Paths
protected final List logFilePaths;
+ // Read block lazily - when true, reads block content on demand
+ private final boolean readBlocksLazily;
// Reverse reader - Not implemented yet (NA -> Why do we need ?)
// but present here for plumbing for future implementation
private final boolean reverseReader;
@@ -148,6 +151,8 @@ public abstract class AbstractHoodieLogRecordReader {
private final List validBlockInstants = new ArrayList<>();
// Use scanV2 method.
private final boolean enableOptimizedLogBlocksScan;
+ // Enable logical timestamp field repair for Avro log blocks (computed once from reader schema).
+ private final boolean enableLogicalTimestampFieldRepair;
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
@@ -174,6 +179,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List keySpecOpt) {
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()),
- readerSchema, true, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
+ logFilePaths.stream()
+ .map(filePath -> new HoodieLogFile(filePath))
+ .collect(Collectors.toList()),
+ readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema,
+ enableLogicalTimestampFieldRepair);
Set scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
@@ -554,8 +565,11 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()),
- readerSchema, true, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
+ logFilePaths.stream()
+ .map(logFile -> new HoodieLogFile(logFile))
+ .collect(Collectors.toList()),
+ readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema,
+ enableLogicalTimestampFieldRepair);
/**
* Scanning log blocks and placing the compacted blocks at the right place require two traversals.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index cf21ef5f42c81..15cd90f844031 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -87,6 +87,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private long lastReverseLogFilePosition;
private final boolean reverseReader;
private final boolean enableRecordLookups;
+ private final boolean enableLogicalTimestampFieldRepair;
private boolean closed = false;
private FSDataInputStream inputStream;
@@ -98,18 +99,16 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false,
- HoodieRecord.RECORD_KEY_METADATA_FIELD);
- }
-
- public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups,
- String keyField) throws IOException {
- this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
+ HoodieRecord.RECORD_KEY_METADATA_FIELD, InternalSchema.getEmptyInternalSchema(), false);
}
+ /**
+ * Constructor with full options for use by HoodieLogFormatReader (FileSystem-based, no storage abstraction).
+ */
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups,
- String keyField, InternalSchema internalSchema) throws IOException {
+ String keyField, InternalSchema internalSchema,
+ boolean enableLogicalTimestampFieldRepair) throws IOException {
this.fs = fs;
this.hadoopConf = fs.getConf();
// NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path
@@ -125,6 +124,7 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc
this.enableRecordLookups = enableRecordLookups;
this.keyField = keyField;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
+ this.enableLogicalTimestampFieldRepair = enableLogicalTimestampFieldRepair;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = this.logFile.getFileSize();
}
@@ -200,8 +200,8 @@ private HoodieLogBlock readBlock() throws IOException {
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, internalSchema);
} else {
- return new HoodieAvroDataBlock(() -> getFSDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
- getTargetReaderSchemaForBlock(), header, footer, keyField);
+ return new HoodieAvroDataBlock(() -> getFSDataInputStream(fs, this.logFile, bufferSize), content, true, logBlockContentLoc,
+ getTargetReaderSchemaForBlock(), header, footer, keyField, enableLogicalTimestampFieldRepair);
}
case HFILE_DATA_BLOCK:
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 955f5485ed459..ed27e503fb891 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -45,12 +45,14 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final String recordKeyField;
private final boolean enableInlineReading;
private final int bufferSize;
+ private final boolean enableLogicalTimestampFieldRepair;
private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatReader.class);
HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily,
boolean reverseLogReader, int bufferSize, boolean enableRecordLookups,
- String recordKeyField, InternalSchema internalSchema) throws IOException {
+ String recordKeyField, InternalSchema internalSchema,
+ boolean enableLogicalTimestampFieldRepair) throws IOException {
this.logFiles = logFiles;
this.fs = fs;
this.readerSchema = readerSchema;
@@ -59,10 +61,11 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.recordKeyField = recordKeyField;
this.enableInlineReading = enableRecordLookups;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
- if (logFiles.size() > 0) {
+ this.enableLogicalTimestampFieldRepair = enableLogicalTimestampFieldRepair;
+ if (!logFiles.isEmpty()) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
- enableRecordLookups, recordKeyField, internalSchema);
+ enableRecordLookups, recordKeyField, internalSchema, enableLogicalTimestampFieldRepair);
}
}
@@ -87,7 +90,7 @@ public boolean hasNext() {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader.close();
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
- enableInlineReading, recordKeyField, internalSchema);
+ enableInlineReading, recordKeyField, internalSchema, enableLogicalTimestampFieldRepair);
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 6f04583c0f191..d7ec98758fbbc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -73,6 +73,7 @@
public class HoodieAvroDataBlock extends HoodieDataBlock {
private final ThreadLocal encoderCache = new ThreadLocal<>();
+ private final boolean enableLogicalTimestampFieldRepair;
public HoodieAvroDataBlock(Supplier inputStreamSupplier,
Option content,
@@ -81,8 +82,10 @@ public HoodieAvroDataBlock(Supplier inputStreamSupplier,
Option readerSchema,
Map header,
Map footer,
- String keyField) {
+ String keyField,
+ boolean enableLogicalTimestampFieldRepair) {
super(content, inputStreamSupplier, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false);
+ this.enableLogicalTimestampFieldRepair = enableLogicalTimestampFieldRepair;
}
public HoodieAvroDataBlock(@Nonnull List records,
@@ -90,6 +93,7 @@ public HoodieAvroDataBlock(@Nonnull List records,
@Nonnull String keyField
) {
super(records, header, new HashMap<>(), keyField);
+ this.enableLogicalTimestampFieldRepair = false;
}
@Override
@@ -143,7 +147,7 @@ protected ClosableIterator> deserializeRecords(byte[] conten
checkState(this.readerSchema != null, "Reader's schema has to be non-null");
checkArgument(type != HoodieRecordType.SPARK, "Not support read avro to spark record");
// TODO AvroSparkReader need
- RecordIterator iterator = RecordIterator.getInstance(this, content, true);
+ RecordIterator iterator = RecordIterator.getInstance(this, content, enableLogicalTimestampFieldRepair);
return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord) new HoodieAvroIndexedRecord(data));
}
@@ -234,6 +238,7 @@ public IndexedRecord next() {
@Deprecated
public HoodieAvroDataBlock(List records, Schema schema) {
super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA, schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ this.enableLogicalTimestampFieldRepair = false;
}
public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema) throws IOException {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index 8f2cd8c644786..76527fc2e874b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -43,7 +43,7 @@ public HoodieCDCDataBlock(
Map header,
String keyField) {
super(inputStreamSupplier, content, readBlockLazily, logBlockContentLocation,
- Option.of(readerSchema), header, new HashMap<>(), keyField);
+ Option.of(readerSchema), header, new HashMap<>(), keyField, false);
}
public HoodieCDCDataBlock(List records,
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
index 99efa89fa0542..aa26db8272eca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
@@ -52,12 +52,6 @@ public static Instant microsToInstant(long microsFromEpoch) {
return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
}
- public static Instant nanosToInstant(long nanosFromEpoch) {
- long epochSeconds = nanosFromEpoch / (1_000_000_000L);
- long nanoAdjustment = nanosFromEpoch % (1_000_000_000L);
- return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
- }
-
/**
* Converts provided {@link Instant} to microseconds (from epoch)
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java
index 223ae8abc42b9..50032af9feffb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java
@@ -43,4 +43,25 @@ public static String appendCommitTimeAndExternalFileMarker(String filePath, Stri
public static boolean isExternallyCreatedFile(String fileName) {
return fileName.endsWith(EXTERNAL_FILE_SUFFIX);
}
+
+ /**
+ * Removes the appended commit time and external file marker from the file path, returning the original file path.
+ *
+ * @param filePath The file path with commit time and external file marker appended
+ */
+ public static String removeCommitTimeAndExternalFileMarker(String filePath) {
+ if (!isExternallyCreatedFile(filePath)) {
+ return filePath;
+ }
+ // Remove the suffix
+ String pathWithoutSuffix = filePath.substring(0, filePath.length() - EXTERNAL_FILE_SUFFIX.length());
+ // Find the last underscore which separates the commit time
+ int lastUnderscoreIndex = pathWithoutSuffix.lastIndexOf('_');
+ if (lastUnderscoreIndex == -1) {
+ // No underscore found, return as is
+ return filePath;
+ }
+ // Return the path without the commit time and suffix
+ return pathWithoutSuffix.substring(0, lastUnderscoreIndex);
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e848b166d0ecf..c15ea5fbc34ec 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -312,10 +312,12 @@ public List> readRangeFromParquetMetadata(
Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);
// Collect stats from all individual Parquet blocks
- Map>> columnToStatsListMap =
- (Map>>) metadata.getBlocks().stream().sequential()
- .flatMap(blockMetaData ->
- blockMetaData.getColumns().stream()
+ // NOTE: Explicit cast on inner stream helps Java with type inference
+ @SuppressWarnings("unchecked")
+ Stream> blockStream = metadata.getBlocks().stream().sequential()
+ .flatMap(blockMetaData -> {
+ Stream> columnStream =
+ (Stream>) (Stream>) blockMetaData.getColumns().stream()
.filter(f -> cols.contains(f.getPath().toDotString()))
.map(columnChunkMetaData -> {
Statistics stats = columnChunkMetaData.getStatistics();
@@ -335,9 +337,11 @@ public List> readRangeFromParquetMetadata(
columnChunkMetaData.getValueCount(),
columnChunkMetaData.getTotalSize(),
columnChunkMetaData.getTotalUncompressedSize());
- })
- )
- .collect(groupingByCollector);
+ });
+ return columnStream;
+ });
+
+ Map>> columnToStatsListMap = blockStream.collect(groupingByCollector);
// Combine those into file-level statistics
// NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
index ac14ea9e5c6e8..aa2a64e11d770 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
@@ -18,6 +18,7 @@
package org.apache.hudi.internal.schema.convert;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.internal.schema.InternalSchema;
@@ -239,9 +240,9 @@ private static Type visitAvroPrimitiveToBuildInternalType(Schema primitive) {
return Types.TimestampMillisType.get();
} else if (logical instanceof LogicalTypes.TimestampMicros) {
return Types.TimestampType.get();
- } else if (isLocalTimestampMillis(logical)) {
+ } else if (HoodieAvroUtils.isLocalTimestampMillis(logical)) {
return Types.LocalTimestampMillisType.get();
- } else if (isLocalTimestampMicros(logical)) {
+ } else if (HoodieAvroUtils.isLocalTimestampMicros(logical)) {
return Types.LocalTimestampMicrosType.get();
} else if (LogicalTypes.uuid().getName().equals(name)) {
return Types.UUIDType.get();
@@ -497,40 +498,6 @@ private static int computeMinBytesForPrecision(int precision) {
return numBytes;
}
- /**
- * Checks if a logical type is an instance of LocalTimestampMillis using reflection.
- * Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
- */
- private static boolean isLocalTimestampMillis(LogicalType logicalType) {
- if (logicalType == null) {
- return false;
- }
- try {
- Class> localTimestampMillisClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMillis");
- return localTimestampMillisClass.isInstance(logicalType);
- } catch (ClassNotFoundException e) {
- // Class doesn't exist (e.g., Avro 1.8.2)
- return false;
- }
- }
-
- /**
- * Checks if a logical type is an instance of LocalTimestampMicros using reflection.
- * Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
- */
- private static boolean isLocalTimestampMicros(LogicalType logicalType) {
- if (logicalType == null) {
- return false;
- }
- try {
- Class> localTimestampMicrosClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMicros");
- return localTimestampMicrosClass.isInstance(logicalType);
- } catch (ClassNotFoundException e) {
- // Class doesn't exist (e.g., Avro 1.8.2)
- return false;
- }
- }
-
/**
* Creates a LocalTimestampMicros schema using reflection.
* Returns null if the class doesn't exist (e.g., in Avro 1.8.2).
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index 275ca3ea738de..ade684aa2d04c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -44,7 +44,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.hudi.avro.AvroSchemaUtils.getRepairedSchema;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
@@ -165,7 +167,8 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema
Option promotedSchema = Option.empty();
if (!renamedColumns.isPresent() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema, schema)) {
AvroReadSupport.setAvroReadSchema(conf, repairedFileSchema);
- AvroReadSupport.setRequestedProjection(conf, repairedFileSchema);
+ Schema projectionSchema = computeSafeProjection(repairedFileSchema, schema);
+ AvroReadSupport.setRequestedProjection(conf, projectionSchema);
promotedSchema = Option.of(schema);
} else {
AvroReadSupport.setAvroReadSchema(conf, schema);
@@ -185,6 +188,41 @@ private ClosableIterator getIndexedRecordIteratorInternal(Schema
return parquetReaderIterator;
}
+ /**
+ * Computes a safe projection schema by intersecting the requested schema with the file schema.
+ * This ensures we only request fields that actually exist in the file, enabling column pruning
+ * while avoiding "field not found" errors.
+ *
+ * @param fileSchema The schema from the file (with repaired types)
+ * @param requestedSchema The schema we'd like to read
+ * @return A projection schema containing only fields that exist in both schemas
+ */
+ private Schema computeSafeProjection(Schema fileSchema, Schema requestedSchema) {
+ Map fileFields = fileSchema.getFields().stream()
+ .collect(Collectors.toMap(Schema.Field::name, f -> f));
+
+ List projectedFields = requestedSchema.getFields().stream()
+ .filter(field -> fileFields.containsKey(field.name()))
+ .map(field -> {
+ Schema.Field fileField = fileFields.get(field.name());
+ return new Schema.Field(fileField.name(), fileField.schema(), fileField.doc(), fileField.defaultVal());
+ })
+ .collect(Collectors.toList());
+
+ if (projectedFields.isEmpty()) {
+ return fileSchema;
+ }
+
+ Schema projectedSchema = Schema.createRecord(
+ fileSchema.getName(),
+ fileSchema.getDoc(),
+ fileSchema.getNamespace(),
+ fileSchema.isError()
+ );
+ projectedSchema.setFields(projectedFields);
+ return projectedSchema;
+ }
+
@Override
public ClosableIterator getRecordKeyIterator() throws IOException {
ClosableIterator recordKeyIterator = getIndexedRecordIterator(HoodieAvroUtils.getRecordKeySchema());
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index 00fff9a220c64..b6a2b8d60a074 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -41,6 +41,8 @@
*/
public interface HoodieFileReader extends AutoCloseable {
+ String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable";
+
String[] readMinMaxRecordKeys();
BloomFilter readBloomFilter();
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index c34f0309db5d7..4ad0d3126f9aa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -241,7 +241,7 @@ class ColumnStats {
Collector, ?, Map>> collector =
Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity());
- return (Map>) targetFields.stream()
+ Stream> stream = targetFields.stream()
.map(field -> {
ColumnStats colStats = allColumnStats.get(field.name());
return HoodieColumnRangeMetadata.create(
@@ -257,8 +257,9 @@ class ColumnStats {
0,
0
);
- })
- .collect(collector);
+ });
+
+ return stream.collect(collector);
}
/**
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 601f83101c9b7..129bb5287736e 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -425,7 +425,7 @@ public void testHugeLogFileWrite() throws IOException, URISyntaxException, Inter
byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes();
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(new Configuration(), null, 0, dataBlockContentBytes.length, 0);
HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable(dataBlockContentBytes), false,
- logBlockContentLoc, Option.ofNullable(getSimpleSchema()), header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ logBlockContentLoc, Option.ofNullable(getSimpleSchema()), header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD, false);
long writtenSize = 0;
int logBlockWrittenNum = 0;
while (writtenSize < Integer.MAX_VALUE) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 5fc0d0d6a6945..970cec0b79f85 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -146,62 +146,6 @@ class ColumnStatsIndexSupport(spark: SparkSession,
}
}
- /**
- * Loads view of the Column Stats Index in a transposed format where single row coalesces every columns'
- * statistics for a single file, returning it as [[DataFrame]]
- *
- * Please check out scala-doc of the [[transpose]] method explaining this view in more details
- */
- def loadTransposed[T](targetColumns: Seq[String],
- shouldReadInMemory: Boolean,
- prunedPartitions: Option[Set[String]] = None,
- prunedFileNamesOpt: Option[Set[String]] = None)(block: DataFrame => T): T = {
- cachedColumnStatsIndexViews.get(targetColumns) match {
- case Some(cachedDF) =>
- block(cachedDF)
- case None =>
- val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = prunedFileNamesOpt match {
- case Some(prunedFileNames) =>
- val filterFunction = new SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] {
- override def apply(r: HoodieMetadataColumnStats): java.lang.Boolean = {
- prunedFileNames.contains(r.getFileName)
- }
- }
- loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory).filter(filterFunction)
- case None =>
- loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
- }
-
- withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
- val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns)
- val df = if (shouldReadInMemory) {
- // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
- // of the transposed table in memory, facilitating execution of the subsequently chained operations
- // on it locally (on the driver; all such operations are actually going to be performed by Spark's
- // Optimizer)
- HoodieUnsafeUtils.createDataFrameFromRows(spark, transposedRows.collectAsList().asScala.toSeq, indexSchema)
- } else {
- val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
- spark.createDataFrame(rdd, indexSchema)
- }
-
- if (allowCaching) {
- cachedColumnStatsIndexViews.put(targetColumns, df)
- // NOTE: Instead of collecting the rows from the index and hold them in memory, we instead rely
- // on Spark as (potentially distributed) cache managing data lifecycle, while we simply keep
- // the referenced to persisted [[DataFrame]] instance
- df.persist(StorageLevel.MEMORY_ONLY)
-
- block(df)
- } else {
- withPersistedDataset(df) {
- block(df)
- }
- }
- }
- }
- }
-
/**
* Loads a view of the Column Stats Index in a raw format, returning it as [[DataFrame]]
*
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 0d0b595730a2b..f09d3ae3c9602 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.AvroConversionUtils.getAvroSchemaWithDefaults
import org.apache.hudi.HoodieBaseRelation._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
@@ -47,6 +47,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.io.storage.HoodieAvroHFileReader
+import org.apache.hudi.io.storage.HoodieFileReader
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
@@ -108,7 +109,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def tableName: String = metaClient.getTableConfig.getTableName
- protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ protected lazy val conf: Configuration = {
+ val c = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ c.set(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR,
+ AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString)
+ c
+ }
protected lazy val jobConf = new JobConf(conf)
protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 124291f1bef0d..293c2552d4457 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -354,32 +354,12 @@ case class HoodieFileIndex(spark: SparkSession,
// Identify timestamp-millis columns from the Avro schema to skip from filter translation
// (even if they're in the index, they may have been indexed before the fix and should not be used for filtering)
- val timestampMillisColumns = scala.collection.mutable.Set[String]()
- try {
- val avroSchema = rawAvroSchema
- if (avroSchema.getType == org.apache.avro.Schema.Type.RECORD) {
- avroSchema.getFields.asScala.foreach { field =>
- val fieldSchema = AvroSchemaUtils.getNonNullTypeFromUnion(field.schema())
- if (fieldSchema.getType == org.apache.avro.Schema.Type.LONG) {
- val logicalType = fieldSchema.getLogicalType
- if (logicalType != null) {
- val logicalTypeName = logicalType.getName
- if (logicalTypeName == "timestamp-millis" || logicalTypeName == "local-timestamp-millis") {
- timestampMillisColumns.add(field.name())
- }
- }
- }
- }
- }
- } catch {
- case e: Exception =>
- logDebug(s"Failed to identify timestamp-millis columns from Avro schema: ${e.getMessage}")
- }
+ val timestampMillisColumns = HoodieFileIndex.getTimestampMillisColumns(rawAvroSchema)
columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF =>
val indexSchema = transposedColStatsDF.schema
val indexFilter =
- queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema, timestampMillisColumns.toSet))
+ queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema, timestampMillisColumns))
.reduce(And)
val allIndexedFileNames =
@@ -472,6 +452,38 @@ object HoodieFileIndex extends Logging {
schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) }
}
+ /**
+ * Identifies timestamp-millis columns from the Avro schema. These columns are excluded from
+ * column-stats filter translation (e.g. they may have been indexed before a fix and should
+ * not be used for filtering).
+ *
+ * @param avroSchema the table's Avro schema
+ * @return set of field names whose type is timestamp-millis or local-timestamp-millis
+ */
+ def getTimestampMillisColumns(avroSchema: org.apache.avro.Schema): Set[String] = {
+ val timestampMillisColumns = scala.collection.mutable.Set[String]()
+ try {
+ if (avroSchema.getType == org.apache.avro.Schema.Type.RECORD) {
+ avroSchema.getFields.asScala.foreach { field =>
+ val fieldSchema = AvroSchemaUtils.getNonNullTypeFromUnion(field.schema())
+ if (fieldSchema.getType == org.apache.avro.Schema.Type.LONG) {
+ val logicalType = fieldSchema.getLogicalType
+ if (logicalType != null) {
+ val logicalTypeName = logicalType.getName
+ if (logicalTypeName == "timestamp-millis" || logicalTypeName == "local-timestamp-millis") {
+ timestampMillisColumns.add(field.name())
+ }
+ }
+ }
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logDebug(s"Failed to identify timestamp-millis columns from Avro schema: ${e.getMessage}")
+ }
+ timestampMillisColumns.toSet
+ }
+
def getConfigProperties(spark: SparkSession, options: Map[String, String]) = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = TypedProperties.fromMap(options.filter(p => p._2 != null).asJava)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index ee23fe85c7632..3790839e88755 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -19,6 +19,7 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hadoop.fs.{GlobPattern, Path}
+import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME
import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
@@ -35,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException, HoodieIncrementalPathNotFoundException}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.SerDeHelper
+import org.apache.hudi.io.storage.HoodieFileReader
import org.apache.hudi.table.HoodieSparkTable
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
@@ -203,10 +205,13 @@ class IncrementalRelation(val sqlContext: SQLContext,
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath)
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
- // Pass table Avro schema to LegacyHoodieParquetFileFormat to preserve correct logical types
+ // Pass table Avro schema to hadoopConf so supportBatch() can find it (supportBatch does not receive options).
if (tableAvroSchema.getType != Schema.Type.NULL) {
LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf(
sqlContext.sparkContext.hadoopConfiguration, tableAvroSchema)
+ sqlContext.sparkContext.hadoopConfiguration.set(
+ HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR,
+ AvroSchemaUtils.hasTimestampMillisField(tableAvroSchema).toString)
}
val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
@@ -273,7 +278,12 @@ class IncrementalRelation(val sqlContext: SQLContext,
if (regularFileIdToFullPath.nonEmpty) {
try {
- df = df.union(sqlContext.read.options(sOpts)
+ val optionsWithSchema = if (tableAvroSchema.getType != Schema.Type.NULL) {
+ sOpts + (LegacyHoodieParquetFileFormat.HOODIE_TABLE_AVRO_SCHEMA -> tableAvroSchema.toString)
+ } else {
+ sOpts
+ }
+ df = df.union(sqlContext.read.options(optionsWithSchema)
.schema(usedSchema).format(formatClassName)
// Setting time to the END_INSTANT_TIME, to avoid pathFilter filter out files incorrectly.
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
index 5ecdc8ae476e5..4b9e0fc5c369d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
@@ -25,7 +25,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat.{FILE_FORMAT_ID, HOODIE_TABLE_AVRO_SCHEMA}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
@@ -41,10 +44,12 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS
/**
* Try to get table Avro schema from hadoopConf.
- * Callers (e.g., IncrementalRelation, HoodieBaseRelation) should set the schema
- * using LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf() before reading.
+ * Callers (e.g., IncrementalRelation) should set the schema using
+ * LegacyHoodieParquetFileFormat.setTableAvroSchemaInConf() before reading so that
+ * supportBatch() can find it (supportBatch does not receive the relation's options).
+ * This is also used as a fallback in buildReaderWithPartitionValues when schema is not in options.
*
- * @return Some(schema) if found in hadoopConf, None otherwise (falls back to StructType conversion)
+ * @return Some(schema) if found in hadoopConf, None otherwise
*/
private def getTableAvroSchemaFromConf(hadoopConf: Configuration): Option[Schema] = {
val schemaStr = hadoopConf.get(LegacyHoodieParquetFileFormat.HOODIE_TABLE_AVRO_SCHEMA)
@@ -63,15 +68,29 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS
}
}
+ /**
+ * Returns whether batch/columnar read is supported.
+ *
+ * Where the second parameter `schema` comes from: Spark's FileSourceScanExec (or
+ * DataSourceScanExec) calls FileFormat.supportBatch(session, schema). The schema is typically
+ * the relation's read schema (HadoopFsRelation.dataSchema or the scan's required schema).
+ * During plan canonicalization (e.g. makeCopy), Spark can pass a schema derived from the
+ * plan's output attributes, which may have anonymized names (e.g. all "none").
+ *
+ * We must NOT use that schema for Avro conversion: it can be wrong and cause
+ * AvroRuntimeException (duplicate field "none"). We only use the table Avro schema from
+ * hadoopConf; if not set, we return false (disable batch).
+ */
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
- // Try to get schema from hadoopConf (set by callers like IncrementalRelation)
- val avroSchema = getTableAvroSchemaFromConf(sparkSession.sessionState.newHadoopConf()).getOrElse {
- // Fallback to converting StructType to Avro schema
- AvroConversionUtils.convertStructTypeToAvroSchema(schema, schema.typeName)
+ getTableAvroSchemaFromConf(sparkSession.sessionState.newHadoopConf()) match {
+ case Some(avroSchema) =>
+ sparkAdapter
+ .createLegacyHoodieParquetFileFormat(true, avroSchema).get.supportBatch(sparkSession, schema)
+ case None =>
+ // Table Avro schema not in hadoopConf (supportBatch does not receive options).
+ // Do not use the passed-in schema for conversion - it can be wrong (canonicalized "none" names).
+ false
}
-
- sparkAdapter
- .createLegacyHoodieParquetFileFormat(true, avroSchema).get.supportBatch(sparkSession, schema)
}
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
@@ -85,16 +104,43 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS
options.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
- // Try to get schema from hadoopConf (set by callers like IncrementalRelation)
- val avroSchema = getTableAvroSchemaFromConf(hadoopConf).getOrElse {
- // Fallback to converting StructType to Avro schema
- val fullTableSchema = StructType(dataSchema.fields ++ partitionSchema.fields)
- AvroConversionUtils.convertStructTypeToAvroSchema(fullTableSchema, dataSchema.typeName)
- }
+ val avroSchema = options.get(LegacyHoodieParquetFileFormat.HOODIE_TABLE_AVRO_SCHEMA)
+ .map(s => {
+ try {
+ Some(new Schema.Parser().parse(s))
+ } catch {
+ case e: Exception =>
+ logWarning(s"Failed to parse table Avro schema from options: ${e.getMessage}")
+ None
+ }
+ })
+ .flatten
+ .orElse(getTableAvroSchemaFromConf(hadoopConf))
+ .getOrElse {
+ val fullTableSchema = StructType(dataSchema.fields ++ partitionSchema.fields)
+ AvroConversionUtils.convertStructTypeToAvroSchema(fullTableSchema, dataSchema.typeName)
+ }
- sparkAdapter
+ val delegateReader = sparkAdapter
.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath, avroSchema).get
.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
+
+ val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+ val useBatchPlan = supportBatch(sparkSession, resultSchema)
+ if (useBatchPlan) {
+ // Spark built a columnar plan and expects Iterator[ColumnarBatch]. Pass through as-is.
+ (file: PartitionedFile) => delegateReader(file)
+ } else {
+ // Spark built a row-based plan. The delegate may still return ColumnarBatch when it
+ // enables vectorized read. Convert any batch to rows so we always yield InternalRow.
+ (file: PartitionedFile) => {
+ val iter = delegateReader(file).asInstanceOf[Iterator[Any]]
+ iter.flatMap {
+ case r: InternalRow => Seq(r)
+ case b: ColumnarBatch => b.rowIterator().asScala
+ }
+ }
+ }
}
}
@@ -102,17 +148,15 @@ object LegacyHoodieParquetFileFormat {
val FILE_FORMAT_ID = "hoodie-parquet"
/**
- * Configuration key for passing table Avro schema through hadoopConf.
- * If set, this schema will be used instead of fetching from storage.
+ * Configuration key for passing table Avro schema.
+ * Schema can be passed through options map (buildReader) or hadoopConf (supportBatch + fallback).
+ * This preserves the correct logical types (e.g., timestampMillis vs timestampMicros).
*/
val HOODIE_TABLE_AVRO_SCHEMA = "hoodie.table.avro.schema"
/**
* Helper method to set table Avro schema in hadoopConf.
- * This allows callers to pass the schema through hadoopConf to avoid fetching from storage.
- *
- * @param hadoopConf The Hadoop configuration to set the schema in
- * @param avroSchema The Avro schema to set
+ * Required for supportBatch() which only sees hadoopConf, not the relation's options.
*/
def setTableAvroSchemaInConf(hadoopConf: Configuration, avroSchema: Schema): Unit = {
hadoopConf.set(HOODIE_TABLE_AVRO_SCHEMA, avroSchema.toString)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index d0cff9650eedb..e597d552337df 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -854,16 +854,10 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
upsertData(df2, tempRecordPath, tableType.equals("COPY_ON_WRITE"))
// after implicit type change, read the table with vectorized read enabled
- if (HoodieSparkUtils.gteqSpark3_3) {
- assertThrows(classOf[SparkException]) {
- withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") {
- readTable(tempRecordPath)
- }
- }
- } else {
- withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") {
- readTable(tempRecordPath)
- }
+ // The supportBatch override in Spark33/34LegacyHoodieParquetFileFormat ensures that
+ // nested types use row-based reading instead of columnar batches, preventing ClassCastException
+ withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") {
+ readTable(tempRecordPath)
}
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 803702addb489..a71067ec344a2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -43,13 +43,15 @@ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JFunction
+
+import org.apache.avro.Schema
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.execution.datasources.{NoopCache, PartitionDirectory}
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.{IntegerType, StringType}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource}
@@ -788,6 +790,52 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS
}
}
+ @Test
+ def testGetTimestampMillisColumns(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ // RECORD with timestamp-millis and local-timestamp-millis -> both returned
+ val recordWithTimestampMillisSchema = new Schema.Parser().parse(
+ """
+ |{
+ | "type": "record",
+ | "name": "TestRecord",
+ | "fields": [
+ | {"name": "ts_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
+ | {"name": "ts_local_millis", "type": {"type": "long", "logicalType": "local-timestamp-millis"}},
+ | {"name": "plain_long", "type": "long"},
+ | {"name": "name", "type": "string"}
+ | ]
+ |}
+ |""".stripMargin)
+ val resultWithTimestampMillis = HoodieFileIndex.getTimestampMillisColumns(recordWithTimestampMillisSchema)
+ assertEquals(2, resultWithTimestampMillis.size)
+ assertTrue(resultWithTimestampMillis.contains("ts_millis"))
+ assertTrue(resultWithTimestampMillis.contains("ts_local_millis"))
+ assertFalse(resultWithTimestampMillis.contains("plain_long"))
+ assertFalse(resultWithTimestampMillis.contains("name"))
+
+ // RECORD with only plain long and string -> empty set
+ val recordWithoutTimestampMillisSchema = new Schema.Parser().parse(
+ """
+ |{
+ | "type": "record",
+ | "name": "PlainRecord",
+ | "fields": [
+ | {"name": "id", "type": "long"},
+ | {"name": "name", "type": "string"}
+ | ]
+ |}
+ |""".stripMargin)
+ val resultPlain = HoodieFileIndex.getTimestampMillisColumns(recordWithoutTimestampMillisSchema)
+ assertTrue(resultPlain.isEmpty)
+
+ // Non-RECORD schema -> empty set
+ val stringSchema = Schema.create(Schema.Type.STRING)
+ val resultNonRecord = HoodieFileIndex.getTimestampMillisColumns(stringSchema)
+ assertTrue(resultNonRecord.isEmpty)
+ }
+ }
+
private def attribute(partition: String): AttributeReference = {
AttributeReference(partition, StringType, true)()
}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 47f2ce46c7bf9..77d77322d16d1 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -87,7 +87,8 @@ class Spark2Adapter extends SparkAdapter {
dataType.getClass.getSimpleName.startsWith("TimestampNTZType")
}
- override def getParquetReadSupport(messageScheme: org.apache.hudi.common.util.Option[MessageType]):
+ override def getParquetReadSupport(conf: Configuration,
+ messageScheme: org.apache.hudi.common.util.Option[MessageType]):
org.apache.parquet.hadoop.api.ReadSupport[_] = {
// ParquetReadSupport is package-private in Spark 2.4, so we use reflection to instantiate it
val clazz = Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport")
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 17de9e00fea45..502ba9d477cbe 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -114,7 +114,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
fileSchema
}
- override def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {
+ override def getParquetReadSupport(conf: Configuration,
+ messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {
new ParquetReadSupport()
}
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index add53e12e2493..e42536735ce93 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -64,6 +64,13 @@ import java.net.URI
*/
class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val conf = sparkSession.sessionState.conf
+ conf.parquetVectorizedReaderEnabled &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType]) &&
+ ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
+ }
+
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
@@ -122,7 +129,8 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
- resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) &&
+ ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index b7f1e69d60888..76ed34e25d236 100644
--- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.adapter
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.{Spark34HoodieFileScanRDD, SparkAdapterSupport$}
-import org.apache.hudi.io.storage.HoodieSparkParquetReader
+import org.apache.hudi.io.storage.HoodieFileReader
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
@@ -81,11 +81,14 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
dataType == DataTypes.TimestampNTZType
}
- override def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {
+ override def getParquetReadSupport(conf: Configuration,
+ messageSchema: org.apache.hudi.common.util.Option[MessageType]): ParquetReadSupport = {
+ val enableTimestampFieldRepair = conf.getBoolean(
+ HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)
new HoodieParquetReadSupport(
Option.empty[ZoneId],
enableVectorizedReader = true,
- enableTimestampFieldRepair = true,
+ enableTimestampFieldRepair,
RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName("CORRECTED")),
RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName("LEGACY")),
messageSchema
@@ -166,7 +169,7 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
val nonNullRequestedSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema)
val cachedRequestedSchema = HoodieInternalRowUtils.getCachedSchema(nonNullRequestedSchema)
val requestedSchemaInMessageType = org.apache.hudi.common.util.Option.of(getAvroSchemaConverter(conf).convert(nonNullRequestedSchema))
- val enableTimestampFieldRepair = conf.getBoolean(HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)
+ val enableTimestampFieldRepair = conf.getBoolean(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)
val repairedRequestedSchema = repairSchemaIfSpecified(enableTimestampFieldRepair, fileSchema, requestedSchemaInMessageType)
val repairedRequestedStructType = new ParquetToSparkSchemaConverter(conf).convert(repairedRequestedSchema)
val evolution = new SparkBasicSchemaEvolution(repairedRequestedStructType, cachedRequestedSchema, SQLConf.get.sessionLocalTimeZone)
diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index fd8e52ea1e92c..6beb33e57fc65 100644
--- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
-import org.apache.hudi.io.storage.HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR
import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.common.table.ParquetTableSchemaResolver
import org.apache.parquet.filter2.compat.FilterCompat
@@ -151,8 +150,6 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
pruneInternalSchema(internalSchemaStr, requiredSchema)
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
}
- hadoopConf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, hasTimestampMillisFieldInTableSchema.toString)
-
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
index e22fefa436803..4f7ce62baa922 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
@@ -359,25 +359,25 @@ private HiveMetaStore.HMSHandler createHMSHandler(HiveConf conf) throws IOExcept
throw new IOException("Failed to create HMSHandler using (String, Configuration, boolean) constructor", e);
}
- // Try Hive 3.x constructor with HiveConf (2 parameters: String, HiveConf)
+ // Try Hive 2.x constructor (3 parameters: String, HiveConf, boolean)
try {
- Constructor> constructor = hmsHandlerClass.getConstructor(String.class, HiveConf.class);
- return (HiveMetaStore.HMSHandler) constructor.newInstance(handlerName, conf);
+ Constructor> constructor = hmsHandlerClass.getConstructor(String.class, HiveConf.class, boolean.class);
+ return (HiveMetaStore.HMSHandler) constructor.newInstance(handlerName, conf, false);
} catch (NoSuchMethodException e) {
// Continue to next option
} catch (Exception e) {
- throw new IOException("Failed to create HMSHandler using (String, HiveConf) constructor", e);
+ throw new IOException("Failed to create HMSHandler using (String, HiveConf, boolean) constructor", e);
}
- // Try Hive 2.x constructor (3 parameters: String, HiveConf, boolean)
+ // Try Hive 2.x constructor with HiveConf (2 parameters: String, HiveConf)
try {
- Constructor> constructor = hmsHandlerClass.getConstructor(String.class, HiveConf.class, boolean.class);
- return (HiveMetaStore.HMSHandler) constructor.newInstance(handlerName, conf, false);
+ Constructor> constructor = hmsHandlerClass.getConstructor(String.class, HiveConf.class);
+ return (HiveMetaStore.HMSHandler) constructor.newInstance(handlerName, conf);
} catch (NoSuchMethodException e) {
throw new IOException("Failed to create HMSHandler. No compatible constructor found. "
+ "Available constructors: " + java.util.Arrays.toString(hmsHandlerClass.getConstructors()), e);
} catch (Exception e) {
- throw new IOException("Failed to create HMSHandler using (String, HiveConf, boolean) constructor", e);
+ throw new IOException("Failed to create HMSHandler using (String, HiveConf) constructor", e);
}
}
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index cb28e3a3ec264..6375b0c985fcd 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -542,11 +542,5 @@
${thrift.version}
test
-
- org.apache.hudi
- hudi-spark-common_2.12
- 0.14.2-SNAPSHOT
- test
-
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java
index b5cb01d72a69a..0295e80bed8be 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/ChainedSchemaPostProcessor.java
@@ -48,6 +48,6 @@ public Schema processSchema(Schema schema) {
for (SchemaPostProcessor processor : processors) {
targetSchema = processor.processSchema(targetSchema);
}
- return schema;
+ return targetSchema;
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 7b21ef0880239..56cf5fa128e3a 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -838,7 +838,7 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati
sparkSession.conf().set("spark.sql.session.timeZone", "UTC");
sparkSession.conf().set("spark.sql.parquet.enableVectorizedReader", "false");
Dataset df = sparkSession.read().format("hudi").load(tableBasePath);
- assertDataframe(df, 16, 16);
+ assertDataframe(df, 15, 15, true);
if ("CLUSTER".equals(operation)) {
// after we cluster, the raw parquet should be correct
@@ -859,7 +859,7 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati
// Read raw parquet files
Dataset rawParquetDf = sparkSession.read().parquet(baseFilePaths.toArray(new String[0]));
- assertDataframe(rawParquetDf, 15, 15);
+ assertDataframe(rawParquetDf, 15, 15, false);
}
} finally {
sparkSession.conf().set("spark.sql.session.timeZone", prevTimezone);
@@ -958,7 +958,7 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati
sparkSession.conf().set("spark.sql.session.timeZone", "UTC");
Dataset df = sparkSession.read().format("hudi").load(tableBasePath);
- assertDataframe(df, 12, 14);
+ assertDataframe(df, 12, 14, false);
metaClient = HoodieTableMetaClient.builder()
.setConf(hadoopConf)
@@ -978,7 +978,7 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati
// Read raw parquet files
Dataset rawParquetDf = sparkSession.read().parquet(baseFilePaths.toArray(new String[0]));
- assertDataframe(rawParquetDf, 12, 14);
+ assertDataframe(rawParquetDf, 12, 14, false);
} else if ("COMPACT".equals(operation)) {
// after compaction some files should be ok
// Validate raw parquet files
@@ -994,7 +994,7 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati
// only read the compacted ones, the others are still incorrect
.filter(path -> path.contains(latestInstant.get().getTimestamp()))
.toArray(String[]::new));
- assertDataframe(rawParquetDf, 8, 8);
+ assertDataframe(rawParquetDf, 8, 8, false);
}
} finally {
sparkSession.conf().set("spark.sql.session.timeZone", prevTimezone);
@@ -1006,23 +1006,24 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati
});
}
- public static void assertDataframe(Dataset df, int above, int below) {
+ public static void assertDataframe(Dataset df, int above, int below, boolean isCOWSnapshotQuery) {
List rows = df.collectAsList();
assertEquals(above + below, rows.size());
+ int indexAdjustment = isCOWSnapshotQuery ? 0 : 1;
for (Row row : rows) {
String val = row.getString(6);
int hash = val.hashCode();
if ((hash & 1) == 0) {
- assertEquals("2020-01-01T00:00:00.001Z", row.getTimestamp(15).toInstant().toString());
- assertEquals("2020-06-01T12:00:00.000001Z", row.getTimestamp(16).toInstant().toString());
- assertEquals("2015-05-20T12:34:56.001", row.get(17).toString());
- assertEquals("2017-07-07T07:07:07.000001", row.get(18).toString());
+ assertEquals("2020-01-01T00:00:00.001Z", row.getTimestamp(14 + indexAdjustment).toInstant().toString());
+ assertEquals("2020-06-01T12:00:00.000001Z", row.getTimestamp(15 + indexAdjustment).toInstant().toString());
+ assertEquals("2015-05-20T12:34:56.001", row.get(16 + indexAdjustment).toString());
+ assertEquals("2017-07-07T07:07:07.000001", row.get(17 + indexAdjustment).toString());
} else {
- assertEquals("2019-12-31T23:59:59.999Z", row.getTimestamp(15).toInstant().toString());
- assertEquals("2020-06-01T11:59:59.999999Z", row.getTimestamp(16).toInstant().toString());
- assertEquals("2015-05-20T12:34:55.999", row.get(17).toString());
- assertEquals("2017-07-07T07:07:06.999999", row.get(18).toString());
+ assertEquals("2019-12-31T23:59:59.999Z", row.getTimestamp(14 + indexAdjustment).toInstant().toString());
+ assertEquals("2020-06-01T11:59:59.999999Z", row.getTimestamp(15 + indexAdjustment).toInstant().toString());
+ assertEquals("2015-05-20T12:34:55.999", row.get(16 + indexAdjustment).toString());
+ assertEquals("2017-07-07T07:07:06.999999", row.get(17 + indexAdjustment).toString());
}
}
diff --git a/pom.xml b/pom.xml
index 9f6e1eef7bb8a..c83d55f1b8dd9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2348,6 +2348,8 @@
hudi-spark2-common
2.0.0
+
+ 2.6.0
1.10.1
1.6.0
1.8.2