Skip to content

Task serialization failed: java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)' #40

@vbogretsov

Description

@vbogretsov

Hello!

I'm trying to run HoodieMultiTableStreamer (0.14.1) on Spark Operator (v1beta2-1.3.7-3.1.1) in Kubernetes (1.29 AWS EKS) and I'm getting the following error:

24/04/15 19:22:20 ERROR HoodieMultiTableStreamer: error while running MultiTableDeltaStreamer for table: my_table1
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)'
java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)'
        at org.apache.spark.HoodieSparkKryoRegistrar.registerClasses(HoodieSparkKryoRegistrar.scala:53)
        at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$8(KryoSerializer.scala:182)
        at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$8$adapted(KryoSerializer.scala:182)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:182)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:240)
        at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:173)
        at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:104)
        at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
        at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:111)
        at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:351)
        at org.apache.spark.serializer.KryoSerializationStream.<init>(KryoSerializer.scala:271)
        at org.apache.spark.serializer.KryoSerializerInstance.serializeStream(KryoSerializer.scala:437)
        at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:356)
        at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:160)
        at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
        at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
        at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1548)
        at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1530)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1535)
        at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1353)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1295)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2931)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1545)
        at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1353)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1295)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2931)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
        at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:73)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:476)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:132)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:78)
        at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563)
        at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:562)
        at org.apache.hudi.utilities.sources.ParquetDFSSource.fromFiles(ParquetDFSSource.java:55)
        at org.apache.hudi.utilities.sources.ParquetDFSSource.lambda$fetchNextBatch$0(ParquetDFSSource.java:50)
        at org.apache.hudi.common.util.Option.map(Option.java:108)
        at org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:50)
        at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:44)
        at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
        at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:161)
        at org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:629)
        at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:525)
        at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:498)
        at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:404)
        at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:850)
        at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:207)
        at org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.sync(HoodieMultiTableStreamer.java:456)
        at org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer.main(HoodieMultiTableStreamer.java:281)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoSuchMethodError: 'void org.apache.hudi.common.util.HoodieCommonKryoRegistrar.registerClasses(com.esotericsoftware.kryo.Kryo)'
        at org.apache.spark.HoodieSparkKryoRegistrar.registerClasses(HoodieSparkKryoRegistrar.scala:53)
        at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$8(KryoSerializer.scala:182)
        at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$8$adapted(KryoSerializer.scala:182)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:182)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:240)
        at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:173)
        at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:104)
        at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
        at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:111)
        at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:351)
        at org.apache.spark.serializer.KryoSerializationStream.<init>(KryoSerializer.scala:271)
        at org.apache.spark.serializer.KryoSerializerInstance.serializeStream(KryoSerializer.scala:437)
        at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:356)
        at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:160)
        at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
        at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
        at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1548)
        at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1530)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1535)
        at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1353)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1295)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2931)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

My Spark Operator configuration is the following:

apiVersion: v1
kind: ConfigMap
metadata:
  name: etc-hudi
  namespace: dp2
data:
  hudi-defaults.conf: |
    hoodie.upsert.shuffle.parallelism=8
    hoodie.insert.shuffle.parallelism=8
    hoodie.delete.shuffle.parallelism=8
    hoodie.bulkinsert.shuffle.parallelism=8
  base.properties: |
    hoodie.parquet.small.file.limit=16777216
    hoodie.index.type=GLOBAL_BLOOM
    hoodie.bloom.index.update.partition.path=true
    hoodie.datasource.write.hive_style_partitioning=false
    hoodie.datasource.hive_sync.enable=true
    hoodie.datasource.hive_sync.database=hudidemo
    hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
    hoodie.datasource.hive_sync.use_jdbc=false
    hoodie.datasource.hive_sync.mode=hms
    hoodie.streamer.ingestion.tablesToBeIngested=myapp.my_table1,myapp.my_table2
    hoodie.streamer.ingestion.myapp.my_table1.configFile=/etc/hudi/myapp.my_table1.properties
    hoodie.streamer.ingestion.myapp.my_table2.configFile=/etc/hudi/myapp.my_table2.properties
  myapp.my_table1.properties: |
    include=base.properties
    hoodie.datasource.write.keygenerator.type=SIMPLE
    hoodie.datasource.write.recordkey.field=id
    hoodie.datasource.write.precombine.field=__version
    hoodie.datasource.write.partitionpath.field=__gen
    hoodie.datasource.hive_sync.table=myapp_my_table1
    hoodie.streamer.source.dfs.root=s3a://mybucket/hudidemo/raw/myapp/myapp.public.my_table1/
  myapp.my_table2.properties: |
    include=base.properties
    hoodie.datasource.write.keygenerator.type=SIMPLE
    hoodie.datasource.write.recordkey.field=id
    hoodie.datasource.write.precombine.field=__version
    hoodie.datasource.write.partitionpath.field=__gen
    hoodie.datasource.hive_sync.table=myapp_my_table2
    hoodie.streamer.source.dfs.root=s3a://mybucket/hudidemo/raw/myapp/myapp.public.my_table2/

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: dfs-props
  namespace: dp2
data:
  dfs-source.properties: |
    # empty

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: hudidemo
  namespace: dp2
spec:
  timeToLiveSeconds: 600
  type: Java
  mode: cluster
  image: myimage:1.0.0
  imagePullPolicy: Always
  imagePullSecrets:
    - gitlab-datalake
  sparkVersion: 3.4.0
  mainApplicationFile: local:///opt/spark/jars/hudi-utilities-bundle_2.12-0.14.1.jar
  mainClass: org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer
  arguments:
    - --props
    - /etc/hudi/base.properties
    - --config-folder
    - /etc/hudi
    - --base-path-prefix
    - s3a://mybucket/hudidemo/hudi/
    - --table-type
    - COPY_ON_WRITE
    - --source-class
    - org.apache.hudi.utilities.sources.ParquetDFSSource
    - --source-ordering-field
    - __version
    - --op
    - UPSERT
    - --enable-sync
    - --sync-tool-classes
    - org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool
  memoryOverheadFactor: "0.4"
  sparkConf:
    spark.kubernetes.file.upload.path: s3a://mybucket/_spark
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    spark.kryo.registrator: org.apache.spark.HoodieSparkKryoRegistrar
  hadoopConf:
    fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
    metastore.client.factory.class: com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
  restartPolicy:
    type: Always
  dynamicAllocation:
    enabled: true
    minExecutors: 1
    maxExecutors: 16
  volumes:
  - name: etc-hudi
    configMap:
      name: etc-hudi
  - name: dfs-props
    configMap:
      name: dfs-props
  driver:
    image: myimage:1.0.0
    annotations:
      "karpenter.sh/do-not-disrupt": "true"
    serviceAccount: hudidemo
    nodeSelector:
      nodetype: worker
    tolerations:
    - key: nodetype
      value: worker
      effect: NoSchedule
    labels:
      app: hudidemo
      version: 3.4.0
    memory: 4g
    env:
    - name: "AWS_DEFAULT_REGION"
      value: my-region-1
    - name: "AWS_REGION"
      value: my-region-1
    - name: "AWS_WEB_IDENTITY_TOKEN_FILE"
      value: "/var/run/secrets/eks.amazonaws.com/serviceaccount/token"
    volumeMounts:
    - mountPath: /etc/hudi
      name: etc-hudi
    - mountPath: /opt/spark/work-dir/src/test/resources/streamer-config/dfs-source.properties
      name: dfs-props
  executor:
    image: myimage:1.0.0
    annotations:
      "karpenter.sh/do-not-evict": "true"
    nodeSelector:
      nodetype: worker
    tolerations:
    - key: nodetype
      value: worker
      effect: NoSchedule
    labels:
      app: hudidemo
      version: 3.4.0
    memory: 4g

I used the following Dockerfile to build the image myimage:1.0.0:

ARG BASE=apache/spark:3.4.0
ARG MVNROOT=https://maven-central-eu.storage-download.googleapis.com/maven2

FROM alpine:3.19 as aws-jars
ARG MVNROOT
ARG AWSSDK1=1.12.682
ARG AWSSDK2=2.25.13
WORKDIR /jars
RUN wget ${MVNROOT}/com/amazonaws/aws-java-sdk-bundle/${AWSSDK1}/aws-java-sdk-bundle-${AWSSDK1}.jar
RUN wget ${MVNROOT}/software/amazon/awssdk/bundle/${AWSSDK2}/bundle-${AWSSDK2}.jar

FROM alpine:3.19 as hadoop-jars
ARG MVNROOT
ARG HADOOP=3.3.4
WORKDIR /jars
RUN wget ${MVNROOT}/org/apache/hadoop/hadoop-common/${HADOOP}/hadoop-common-${HADOOP}.jar
RUN wget ${MVNROOT}/org/apache/hadoop/hadoop-aws/${HADOOP}/hadoop-aws-${HADOOP}.jar
RUN wget ${MVNROOT}/org/apache/hadoop/hadoop-mapreduce-client-core/${HADOOP}/hadoop-mapreduce-client-core-${HADOOP}.jar

FROM alpine:3.19 as hudi-jars
ARG MVNROOT
ARG SCALA=2.12
ARG HUDI=0.14.1
WORKDIR /jars
RUN wget ${MVNROOT}/org/apache/hudi/hudi-spark3.4-bundle_${SCALA}/${HUDI}/hudi-spark3.4-bundle_${SCALA}-${HUDI}.jar
RUN wget ${MVNROOT}/org/apache/hudi/hudi-aws/${HUDI}/hudi-aws-${HUDI}.jar
RUN wget ${MVNROOT}/org/apache/hudi/hudi-sync-common/${HUDI}/hudi-sync-common-${HUDI}.jar
RUN wget ${MVNROOT}/org/apache/hudi/hudi-hive-sync-bundle/${HUDI}/hudi-hive-sync-bundle-${HUDI}.jar
RUN wget ${MVNROOT}/org/apache/hudi/hudi-utilities-bundle_${SCALA}/${HUDI}/hudi-utilities-bundle_${SCALA}-${HUDI}.jar
RUN wget ${MVNROOT}/org/apache/hudi/hudi-hadoop-mr-bundle/${HUDI}/hudi-hadoop-mr-bundle-${HUDI}.jar

FROM ${BASE} as final
COPY --from=aws-jars /jars /opt/spark/jars
COPY --from=hadoop-jars /jars /opt/spark/jars
COPY --from=hudi-jars /jars /opt/spark/jars
ENV HOME=/opt/spark
ENV PATH=/opt/spark/bin:$PATH
ENV HUDI_CONF_DIR=/etc/hudi
RUN mkdir -p /opt/spark/tmp

The streamer works fine locally being executed in docker compose with exactly same command line arguments but in Spark Operator I'm getting the error described above.

Could someone please suggest why my I'm getting the error?

P.S. tables names, AWS Region and bucket names are redacted.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions