Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
19796f1
Pass schema from option instead of global configuration for thread sa…
linliu-code Jan 28, 2026
35a2995
Address partial comments
linliu-code Jan 28, 2026
6b97dce
Address more comments
linliu-code Jan 29, 2026
5d472f2
changing code parts around HoodieStorage (introduced in 0.15.0.) to u…
Feb 5, 2026
eee51be
Addressed comments
linliu-code Jan 30, 2026
ba8c90e
address wiring comments
linliu-code Jan 30, 2026
435bac2
Fix hive related tests
Feb 2, 2026
7be4046
Fix tests
Feb 3, 2026
94526f4
Fix tests
Feb 4, 2026
d3b7897
Fix tests
Feb 4, 2026
ac14313
Fix test and address review comments
Feb 4, 2026
bd20879
Cherry pick bug fixes
Feb 5, 2026
94dff70
Fix build
Feb 4, 2026
d17b0e8
Fix build
Feb 5, 2026
808d4a4
Revert "Fix tests"
Feb 6, 2026
4f3563a
Revert "Fix build"
Feb 6, 2026
4e5ed98
Fix build issues - 2.11 depending on 2.12 version
Feb 6, 2026
cf421c8
Fix build issues - Column stream explicty collection.2
Feb 6, 2026
2f0ae3b
Fix build issues - Use projection schema instead of repairFileSchema
Feb 6, 2026
a10b077
Fix build issues - remove spark3.5 profiles from github workflow bot
Feb 7, 2026
ac619b6
Fix build issues - Fix IT test pom.xml update
Feb 7, 2026
0f39b53
Fix build issues - Use projection schema fields from repaired schema
Feb 8, 2026
e2af120
Fix build issues - throw exception only for spark 3.4 gt
Feb 8, 2026
7508f66
Fix build issues - throw exception only for spark 3.3
Feb 8, 2026
6a7a5fb
Fix build issues - remove exceptions
Feb 9, 2026
6bea797
Fix build issues - remove exceptions for spark33
Feb 9, 2026
281228a
Fix build issues - remove exceptions for spark33
Feb 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 0 additions & 104 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -264,110 +264,6 @@ jobs:
export PATH="$JAVA_HOME/bin:$PATH"
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-spark-java11-17-java-tests:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"

steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Build Project
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Quickstart Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl hudi-examples/hudi-examples-spark $MVN_ARGS
- name: Java UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Java FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-spark-java11-17-scala-tests:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"

steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Build Project
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Scala UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Scala FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-flink:
runs-on: ubuntu-latest
strategy:
Expand Down
7 changes: 7 additions & 0 deletions hudi-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@
</exclusions>
</dependency>

<!-- Validation API - override old 1.1.0 version from transitive dependencies -->
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.1.Final</version>
</dependency>

<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
private final Configuration conf;
private final BaseFileUtils parquetUtils;
private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable";
private Option<MessageType> fileSchemaOption = Option.empty();
private Option<StructType> structTypeOption = Option.empty();
private Option<Schema> schemaOption = Option.empty();
Expand All @@ -73,7 +72,7 @@ public HoodieSparkParquetReader(Configuration conf, Path path) {
this.path = path;
this.conf = new Configuration(conf);
// Avoid adding record in list element when convert parquet schema to avro schema
conf.set(ADD_LIST_ELEMENT_RECORDS, "false");
this.conf.set(ADD_LIST_ELEMENT_RECORDS, "false");
this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
}

Expand Down Expand Up @@ -126,6 +125,10 @@ private ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
// Set configuration for timestamp_millis type repair (only when not already set).
if (conf.get(HoodieFileReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR) == null) {
conf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, Boolean.toString(AvroSchemaUtils.hasTimestampMillisField(readerSchema)));
}
MessageType fileSchema = getFileSchema();
Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(conf).convert(nonNullSchema));
Expand All @@ -136,8 +139,9 @@ private ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema
conf.set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
conf.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder(
(ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(messageSchema),
new Path(path.toUri())).withConf(conf)
(ReadSupport) SparkAdapterSupport$.MODULE$.sparkAdapter().getParquetReadSupport(conf, messageSchema),
path)
.withConf(conf)
.build();
ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ trait SparkAdapter extends Serializable {

def isTimestampNTZType(dataType: DataType): Boolean

def getParquetReadSupport(messageSchema: org.apache.hudi.common.util.Option[MessageType]): org.apache.parquet.hadoop.api.ReadSupport[_]
def getParquetReadSupport(conf: Configuration,
messageSchema: org.apache.hudi.common.util.Option[MessageType]):
org.apache.parquet.hadoop.api.ReadSupport[_]

def repairSchemaIfSpecified(shouldRepair: Boolean,
fileSchema: MessageType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class TestExternalPathHandling extends HoodieClientTestBase {

private static final String FIELD_1 = "field1";
private static final String FIELD_2 = "field2";
private static final String TEST_SCHEMA = "{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"org.apache.hudi.test\","
+ "\"fields\":[{\"name\":\"field1\",\"type\":\"int\"},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
private HoodieWriteConfig writeConfig;

@ParameterizedTest
Expand All @@ -89,6 +91,7 @@ public void testFlow(FileIdAndNameGenerator fileIdAndNameGenerator, List<String>
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
.withPath(metaClient.getBasePathV2().toString())
.withEmbeddedTimelineServerEnabled(false)
.withSchema(TEST_SCHEMA)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(2)
.enable(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;

public class AvroSchemaRepair {
public static boolean isLocalTimestampSupported = isLocalTimestampMillisSupported();

public static Schema repairLogicalTypes(Schema fileSchema, Schema tableSchema) {
Schema repairedSchema = repairAvroSchema(fileSchema, tableSchema);
Expand Down Expand Up @@ -242,18 +241,4 @@ public static boolean hasTimestampMillisField(Schema tableSchema) {
&& (tableSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis || tableSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis);
}
}

/**
* Check if LogicalTypes.LocalTimestampMillis is supported in the current Avro version
*
* @return true if LocalTimestampMillis is available, false otherwise
*/
public static boolean isLocalTimestampMillisSupported() {
try {
return Arrays.stream(LogicalTypes.class.getDeclaredClasses())
.anyMatch(c -> c.getSimpleName().equals("LocalTimestampMillis"));
} catch (Exception e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,6 @@ protected boolean shouldReadAsPartitionedTable() {
return (partitionColumns.length > 0 && canParsePartitionValues()) || HoodieTableMetadata.isMetadataTable(basePath.toString());
}

protected PartitionPath convertToPartitionPath(String partitionPath) {
Object[] partitionColumnValues = parsePartitionColumnValues(partitionColumns, partitionPath);
return new PartitionPath(partitionPath, partitionColumnValues);
}

private static long fileSliceSize(FileSlice fileSlice) {
long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
.filter(s -> s > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
*/
public class AvroSchemaCache {


// Ensure that there is only one variable instance of the same schema within an entire JVM lifetime
private static final LoadingCache<Schema, Schema> SCHEMA_CACHE = Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k);

Expand All @@ -43,5 +42,4 @@ public class AvroSchemaCache {
public static Schema intern(Schema schema) {
return SCHEMA_CACHE.get(schema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@
*/
public class AvroSchemaUtils {

private static final Class<?> AVRO_SCHEMA_REPAIR_CLASS;
static {
Class<?> clazz = null;
try {
clazz = Class.forName("org.apache.parquet.schema.AvroSchemaRepair");
} catch (ClassNotFoundException e) {
// AvroSchemaRepair not on classpath (e.g. when parquet schema not available)
}
AVRO_SCHEMA_REPAIR_CLASS = clazz;
}

private AvroSchemaUtils() {}

/**
Expand Down Expand Up @@ -238,10 +249,6 @@ public static Option<Schema> findNestedFieldSchema(Schema schema, String fieldNa
return Option.of(getNonNullTypeFromUnion(schema));
}

public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) {
return findNestedFieldSchema(schema, fieldName).map(Schema::getType);
}

/**
* Appends provided new fields at the end of the given schema
*
Expand Down Expand Up @@ -399,13 +406,32 @@ public static void checkSchemaCompatible(
}

public static Schema getRepairedSchema(Schema writerSchema, Schema readerSchema) {
if (AVRO_SCHEMA_REPAIR_CLASS == null) {
return writerSchema;
}
try {
Class<?> avroSchemaRepairClass = Class.forName("org.apache.parquet.schema.AvroSchemaRepair");
Method repairMethod = avroSchemaRepairClass.getMethod("repairLogicalTypes", Schema.class, Schema.class);
Method repairMethod =
AVRO_SCHEMA_REPAIR_CLASS.getMethod("repairLogicalTypes", Schema.class, Schema.class);
return (Schema) repairMethod.invoke(null, writerSchema, readerSchema);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
// Fallback if class/method not available
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
return writerSchema;
}
}

/**
* Returns true if the given Avro schema contains any timestamp-millis logical type.
* Used to decide whether to enable logical timestamp field repair when reading log blocks.
*/
public static boolean hasTimestampMillisField(Schema schema) {
if (schema == null || AVRO_SCHEMA_REPAIR_CLASS == null) {
return false;
}
try {
Method hasTimestampMillisFieldMethod =
AVRO_SCHEMA_REPAIR_CLASS.getMethod("hasTimestampMillisField", Schema.class);
return (Boolean) hasTimestampMillisFieldMethod.invoke(null, schema);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
return false;
}
}
}
37 changes: 19 additions & 18 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ public class HoodieAvroUtils {
private static final Pattern INVALID_AVRO_FIRST_CHAR_IN_NAMES_PATTERN = Pattern.compile("[^A-Za-z_]");
private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";

// Cached Avro logical type classes (null if not available, e.g., Avro 1.8.2).
// Loaded once at class init to avoid repeated Class.forName() and reflection cost per record.
private static final Class<?> LOCAL_TIMESTAMP_MILLIS_CLASS = loadClassSafe("org.apache.avro.LogicalTypes$LocalTimestampMillis");
private static final Class<?> LOCAL_TIMESTAMP_MICROS_CLASS = loadClassSafe("org.apache.avro.LogicalTypes$LocalTimestampMicros");

private static Class<?> loadClassSafe(String name) {
try {
return Class.forName(name);
} catch (ClassNotFoundException e) {
return null;
}
}

// All metadata fields are optional strings.
public static final Schema METADATA_FIELD_SCHEMA = createNullableSchema(Schema.Type.STRING);

Expand Down Expand Up @@ -1406,34 +1419,22 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
* Checks if a logical type is an instance of LocalTimestampMillis using reflection.
* Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
*/
private static boolean isLocalTimestampMillis(LogicalType logicalType) {
if (logicalType == null) {
return false;
}
try {
Class<?> localTimestampMillisClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMillis");
return localTimestampMillisClass.isInstance(logicalType);
} catch (ClassNotFoundException e) {
// Class doesn't exist (e.g., Avro 1.8.2)
public static boolean isLocalTimestampMillis(LogicalType logicalType) {
if (logicalType == null || LOCAL_TIMESTAMP_MILLIS_CLASS == null) {
return false;
}
return LOCAL_TIMESTAMP_MILLIS_CLASS.isInstance(logicalType);
}

/**
* Checks if a logical type is an instance of LocalTimestampMicros using reflection.
* Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
*/
private static boolean isLocalTimestampMicros(LogicalType logicalType) {
if (logicalType == null) {
return false;
}
try {
Class<?> localTimestampMicrosClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMicros");
return localTimestampMicrosClass.isInstance(logicalType);
} catch (ClassNotFoundException e) {
// Class doesn't exist (e.g., Avro 1.8.2)
public static boolean isLocalTimestampMicros(LogicalType logicalType) {
if (logicalType == null || LOCAL_TIMESTAMP_MICROS_CLASS == null) {
return false;
}
return LOCAL_TIMESTAMP_MICROS_CLASS.isInstance(logicalType);
}

private static Object convertDefaultValueForAvroCompatibility(Object defaultValue) {
Expand Down
Loading