Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.dataflint

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -36,16 +37,31 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C
// Eagerly compute the set of node simple-class-names to wrap, respecting per-type flags.
// When the global flag is on everything is enabled; otherwise only nodes whose specific
// flag is enabled are included.
// TimedExec uses a transparent wrapper pattern (children = child.children) that is incompatible
// with Spark 3.0/3.1's withNewChildren (which maps product elements via containsChild).
// On 3.0/3.1, CollapseCodegenStages cannot update children through the wrapper, causing
// ClassCastExceptions. SQL nodes participate in codegen pipelines and are affected;
// Python exec nodes do not participate in codegen and are safe to instrument on all versions.
private val isLegacySpark: Boolean = {
val parts = SPARK_VERSION.split("\\.")
parts.length >= 2 && parts(0) == "3" && (parts(1) == "0" || parts(1) == "1")
}

private val enabledNodeNames: Set[String] = {
val conf = session.sparkContext.conf
val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false)
val sqlNodes = Set(
"FilterExec", "ProjectExec", "ExpandExec", "GenerateExec",
"SortMergeJoinExec", "BroadcastHashJoinExec", "BroadcastNestedLoopJoinExec",
"CartesianProductExec", "WindowGroupLimitExec", "SortAggregateExec", "SortExec", "HashAggregateExec",
"DataWritingCommandExec",
"FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", "RDDScanExec",
)
val sqlNodes = if (isLegacySpark) {
logInfo("DataFlint: Spark 3.0/3.1 detected — skipping SQL node instrumentation (codegen incompatibility)")
Set.empty[String]
} else {
Set(
"FilterExec", "ProjectExec", "ExpandExec", "GenerateExec",
"SortMergeJoinExec", "BroadcastHashJoinExec", "BroadcastNestedLoopJoinExec",
"CartesianProductExec", "WindowGroupLimitExec", "SortAggregateExec", "SortExec", "HashAggregateExec",
"DataWritingCommandExec",
"FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", "RDDScanExec",
)
}
val all = Set(
"BatchEvalPythonExec",
"ArrowEvalPythonExec",
Expand Down
Loading