diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index 1d6e2ea5..b1b4b582 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -1,5 +1,6 @@ package org.apache.spark.dataflint +import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.rules.Rule @@ -36,16 +37,31 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C // Eagerly compute the set of node simple-class-names to wrap, respecting per-type flags. // When the global flag is on everything is enabled; otherwise only nodes whose specific // flag is enabled are included. + // TimedExec uses a transparent wrapper pattern (children = child.children) that is incompatible + // with Spark 3.0/3.1's withNewChildren (which maps product elements via containsChild). + // On 3.0/3.1, CollapseCodegenStages cannot update children through the wrapper, causing + // ClassCastExceptions. SQL nodes participate in codegen pipelines and are affected; + // Python exec nodes do not participate in codegen and are safe to instrument on all versions. + private val isLegacySpark: Boolean = { + val parts = SPARK_VERSION.split("\\.") + parts.length >= 2 && parts(0) == "3" && (parts(1) == "0" || parts(1) == "1") + } + private val enabledNodeNames: Set[String] = { val conf = session.sparkContext.conf val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false) - val sqlNodes = Set( - "FilterExec", "ProjectExec", "ExpandExec", "GenerateExec", - "SortMergeJoinExec", "BroadcastHashJoinExec", "BroadcastNestedLoopJoinExec", - "CartesianProductExec", "WindowGroupLimitExec", "SortAggregateExec", "SortExec", "HashAggregateExec", - "DataWritingCommandExec", - "FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", "RDDScanExec", - ) + val sqlNodes = if (isLegacySpark) { + logInfo("DataFlint: Spark 3.0/3.1 detected — skipping SQL node instrumentation (codegen incompatibility)") + Set.empty[String] + } else { + Set( + "FilterExec", "ProjectExec", "ExpandExec", "GenerateExec", + "SortMergeJoinExec", "BroadcastHashJoinExec", "BroadcastNestedLoopJoinExec", + "CartesianProductExec", "WindowGroupLimitExec", "SortAggregateExec", "SortExec", "HashAggregateExec", + "DataWritingCommandExec", + "FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", "RDDScanExec", + ) + } val all = Set( "BatchEvalPythonExec", "ArrowEvalPythonExec",