Skip to content

Fix reading ordering field with logical type on Spark 4.1 #18606

@yihua

Description

@yihua

Bug Description

What happened:
Tests fail:

- Test Different Type of PreCombineField *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3314.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3314.0 (TID 7481) (runnervmeorf1.ak4m3v4snh5uzga43viiloyuah.ex.internal.cloudapp.net executor driver): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:365)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$e664f7e$1(BaseSparkCommitActionExecutor.java:298)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:103)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:103)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:918)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:918)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:388)
	at org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1412)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1676)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1585)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1650)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1429)
	at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1383)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:386)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:107)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.Instant (java.lang.Long and java.time.Instant are in module java.base of loader 'bootstrap')
	at java.base/java.time.Instant.compareTo(Instant.java:207)
	at org.apache.hudi.common.model.DefaultHoodieRecordPayload.compareOrderingVal(DefaultHoodieRecordPayload.java:203)
	at org.apache.hudi.common.model.DefaultHoodieRecordPayload.needUpdatingPersistedRecord(DefaultHoodieRecordPayload.java:187)
	at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.doRecordMerge(ExpressionPayload.scala:176)
	at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:139)
	at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:121)
	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:630)
	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:628)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:936)
	at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:906)
	at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:121)
	at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:89)
	at org.apache.hudi.common.model.HoodieAvroRecordMerger.merge(HoodieAvroRecordMerger.java:67)
	at org.apache.hudi.common.table.read.BufferedRecordMergerFactory$BaseCustomMerger.finalMerge(BufferedRecordMergerFactory.java:457)
	at org.apache.hudi.common.table.read.buffer.FileGroupRecordBuffer.hasNextBaseRecord(FileGroupRecordBuffer.java:241)
	at org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer.hasNextBaseRecord(KeyBasedFileGroupRecordBuffer.java:138)
	at org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer.doHasNext(KeyBasedFileGroupRecordBuffer.java:147)
	at org.apache.hudi.common.table.read.buffer.FileGroupRecordBuffer.hasNext(FileGroupRecordBuffer.java:152)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:247)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:334)
	at org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
	at org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:271)
	at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
	... 34 more
- Test alter column types *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 89) (runnervmeorf1.dv0dfyhyrulupj4bql4di5djnb.ex.internal.cloudapp.net executor driver): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:365)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$e664f7e$1(BaseSparkCommitActionExecutor.java:298)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:103)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:103)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:918)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:918)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:388)
	at org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1412)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1676)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1585)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1650)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1429)
	at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1383)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:386)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:107)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassCastException: class java.time.LocalDate cannot be cast to class java.lang.Integer (java.time.LocalDate and java.lang.Integer are in module java.base of loader 'bootstrap')
	at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1048)
	at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:997)
	at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:896)
	at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:814)
	at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:894)
	at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:814)
	at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:850)
	at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:814)
	at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:775)
	at org.apache.hudi.common.util.HoodieAvroParquetReaderIterator.next(HoodieAvroParquetReaderIterator.java:42)
	at org.apache.hudi.common.util.HoodieAvroParquetReaderIterator.next(HoodieAvroParquetReaderIterator.java:30)
	at org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer.doHasNext(KeyBasedFileGroupRecordBuffer.java:147)
	at org.apache.hudi.common.table.read.buffer.FileGroupRecordBuffer.hasNext(FileGroupRecordBuffer.java:152)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:247)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:334)
	at org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
	at org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:271)
	at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
	... 34 more

What you expected:
They should work the same as older Spark versions.

Steps to reproduce:
Rerun the tests.

Environment

Hudi version: master
Query engine: Spark
Relevant configs:

Logs and Stack Trace

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    type:bugBug reports and fixes

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions