diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala index 7ecf5b90..d6cc9bc7 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala @@ -33,8 +33,15 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * Extends SparkPlan directly (not UnaryExecNode) to avoid a `UnaryLike` reference in * compiled bytecode, which would cause NoClassDefFoundError on Spark 3.0/3.1. * productElement/productArity support `makeCopy` on Spark 3.0/3.1. + * + * ## Codegen + * `TimedExec` does NOT implement `CodegenSupport`. For nodes that support codegen, + * `TimedWithCodegenExec` extends this class and adds codegen support. This avoids + * ClassCastExceptions on Spark 3.0/3.1 where some nodes (e.g. FileSourceScanExec) + * do not implement `CodegenSupport`. + * Use `TimedExec.apply()` to automatically pick the right variant. */ -class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with Logging { +class TimedExec(val child: SparkPlan) extends SparkPlan with Logging { override def nodeName: String = "DataFlint" + child.nodeName override def output: Seq[Attribute] = child.output @@ -60,7 +67,7 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with // recursive call on the grandchildren is safe. override protected def doPrepare(): Unit = child.prepare() - private def postRddId(rddId: Int): Unit = { + protected def postRddId(rddId: Int): Unit = { val rddIdMetric = longMetric("rddId") rddIdMetric += rddId MetricsUtils.postDriverMetrics(sparkContext, rddIdMetric) @@ -118,72 +125,39 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with override def productElement(n: Int): Any = if (n == 0) child else throw new IndexOutOfBoundsException(s"$n") - // When Spark updates our children (= child's children), rebuild child with new children - // and wrap in a new TimedExec. Used by Spark 3.2+ plan transformations (AQE, CollapseCodegen, etc.) + // When Spark updates our children (= child's children), rebuild child with new children. + // Uses TimedExec.apply to pick the right variant (with or without codegen). override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = - new TimedExec(child.withNewChildren(newChildren)) + TimedExec(child.withNewChildren(newChildren)) +} - // ----------------------------------------------------------------------------------------- - // Codegen execution path - // ----------------------------------------------------------------------------------------- +/** + * TimedExec variant for nodes that implement CodegenSupport. + * Adds codegen timing by wrapping child.produce() with nanoTime deltas. + * Only instantiated via TimedExec.apply() when child is CodegenSupport. + */ +class TimedWithCodegenExec(override val child: SparkPlan) extends TimedExec(child) with CodegenSupport { override def inputRDDs(): Seq[RDD[InternalRow]] = { val rdds = child.asInstanceOf[CodegenSupport].inputRDDs() - // Codegen path: doExecute() is not called, so post rddId here. - // inputRDDs() is called during WSCE code generation on the driver. rdds.headOption.foreach(rdd => postRddId(rdd.id)) rdds } - // Delegate supportCodegen to child — must use child.supportCodegen, not just isInstanceOf. - // Example: SortAggregateExec implements CodegenSupport but returns supportCodegen=false - // when grouping keys are present (doProduce throws UnsupportedOperationException in that case). - override def supportCodegen: Boolean = child match { - case c: CodegenSupport => c.supportCodegen && child.children.length <= 1 - case _ => false + override def supportCodegen: Boolean = { + val c = child.asInstanceOf[CodegenSupport] + c.supportCodegen && child.children.length <= 1 } - // needCopyResult flows DOWN: the default asks children.head.needCopyResult. - // With the transparent wrapper, children = child.children. For multi-child nodes (joins), - // children.length > 1, which hits the default's UnsupportedOperationException branch. - // Fix: ask child directly. child (e.g. SortMergeJoinExec) extends BlockingOperatorWithCodegen - // which returns false, so no recursion. - override def needCopyResult: Boolean = child match { - case c: CodegenSupport => c.needCopyResult - case _ => false - } + override def needCopyResult: Boolean = child.asInstanceOf[CodegenSupport].needCopyResult - // needStopCheck flows UP: do NOT override — the default (parent.needStopCheck) is correct. - // Delegating to child.needStopCheck instead creates an infinite loop because - // child.parent == TimedExec (set by doProduce's produce(ctx, this)), so: - // TimedExec.needStopCheck → child.needStopCheck → child.parent.needStopCheck - // → TimedExec.needStopCheck → ... - /** - * Wrap child.produce() with nanoTime deltas. - * - * Measures cumulative time of the full pipeline below this node. - * Exclusive attribution is done post-hoc: exclusive(N) = D(N) - D(pipelined_child). - * - * The accumulated nanos are flushed to the SQLMetric after each produce invocation. - * For pipelined children this happens per-row; for blocking children once after the - * full build phase. - * - * Uses try/finally so the timing flushes even when blocking operators (SortExec, etc.) - * exit early via shouldStop()/return — without this, duration would be 0. - */ override protected def doProduce(ctx: CodegenContext): String = { - // CodegenSupport.variablePrefix is private and falls through to nodeName.toLowerCase - // for unknown types (like TimedExec). Nodes with spaces in nodeName (e.g. RDDScanExec's - // "Scan ExistingRDD") produce invalid Java identifiers. Sanitize the prefix here. ctx.freshNamePrefix = ctx.freshNamePrefix.replaceAll("[^a-zA-Z0-9_]", "") val durationTerm = metricTerm(ctx, "duration") val startTime = ctx.freshName("timedExecStart") val accumulated = ctx.addMutableState("long", ctx.freshName("timedExecAccNs"), v => s"$v = 0L;") val childCode = child.asInstanceOf[CodegenSupport].produce(ctx, this) - // Test-only: inject a per-partition sleep into the generated code so tests can verify - // that the codegen timing path actually captures wall-clock time. - // Uses a mutable flag so the sleep fires once per partition (not every processNext() call). val sleepMs = sparkContext.conf.getLong("spark.dataflint.test.codegenSleepMs", 0L) val sleepCode = if (sleepMs > 0) { val sleepDone = ctx.addMutableState("boolean", ctx.freshName("timedExecSleepDone"), @@ -206,13 +180,18 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with """.stripMargin } - // Fully transparent — no per-row logic, just forward to parent override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = consume(ctx, input) + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = + TimedExec(child.withNewChildren(newChildren)) } object TimedExec { - def apply(child: SparkPlan): TimedExec = new TimedExec(child) + def apply(child: SparkPlan): TimedExec = child match { + case _: CodegenSupport => new TimedWithCodegenExec(child) + case _ => new TimedExec(child) + } /** * A minimal SparkPlan that wraps execute() with per-partition duration timing.