Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* Extends SparkPlan directly (not UnaryExecNode) to avoid a `UnaryLike` reference in
* compiled bytecode, which would cause NoClassDefFoundError on Spark 3.0/3.1.
* productElement/productArity support `makeCopy` on Spark 3.0/3.1.
*
* ## Codegen
* `TimedExec` does NOT implement `CodegenSupport`. For nodes that support codegen,
* `TimedWithCodegenExec` extends this class and adds codegen support. This avoids
* ClassCastExceptions on Spark 3.0/3.1 where some nodes (e.g. FileSourceScanExec)
* do not implement `CodegenSupport`.
* Use `TimedExec.apply()` to automatically pick the right variant.
*/
class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with Logging {
class TimedExec(val child: SparkPlan) extends SparkPlan with Logging {
override def nodeName: String = "DataFlint" + child.nodeName
override def output: Seq[Attribute] = child.output

Expand All @@ -60,7 +67,7 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with
// recursive call on the grandchildren is safe.
override protected def doPrepare(): Unit = child.prepare()

private def postRddId(rddId: Int): Unit = {
protected def postRddId(rddId: Int): Unit = {
val rddIdMetric = longMetric("rddId")
rddIdMetric += rddId
MetricsUtils.postDriverMetrics(sparkContext, rddIdMetric)
Expand Down Expand Up @@ -118,72 +125,39 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with
override def productElement(n: Int): Any =
if (n == 0) child else throw new IndexOutOfBoundsException(s"$n")

// When Spark updates our children (= child's children), rebuild child with new children
// and wrap in a new TimedExec. Used by Spark 3.2+ plan transformations (AQE, CollapseCodegen, etc.)
// When Spark updates our children (= child's children), rebuild child with new children.
// Uses TimedExec.apply to pick the right variant (with or without codegen).
override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
new TimedExec(child.withNewChildren(newChildren))
TimedExec(child.withNewChildren(newChildren))
}

// -----------------------------------------------------------------------------------------
// Codegen execution path
// -----------------------------------------------------------------------------------------
/**
* TimedExec variant for nodes that implement CodegenSupport.
* Adds codegen timing by wrapping child.produce() with nanoTime deltas.
* Only instantiated via TimedExec.apply() when child is CodegenSupport.
*/
class TimedWithCodegenExec(override val child: SparkPlan) extends TimedExec(child) with CodegenSupport {

override def inputRDDs(): Seq[RDD[InternalRow]] = {
val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
// Codegen path: doExecute() is not called, so post rddId here.
// inputRDDs() is called during WSCE code generation on the driver.
rdds.headOption.foreach(rdd => postRddId(rdd.id))
rdds
}

// Delegate supportCodegen to child — must use child.supportCodegen, not just isInstanceOf.
// Example: SortAggregateExec implements CodegenSupport but returns supportCodegen=false
// when grouping keys are present (doProduce throws UnsupportedOperationException in that case).
override def supportCodegen: Boolean = child match {
case c: CodegenSupport => c.supportCodegen && child.children.length <= 1
case _ => false
override def supportCodegen: Boolean = {
val c = child.asInstanceOf[CodegenSupport]
c.supportCodegen && child.children.length <= 1
}

// needCopyResult flows DOWN: the default asks children.head.needCopyResult.
// With the transparent wrapper, children = child.children. For multi-child nodes (joins),
// children.length > 1, which hits the default's UnsupportedOperationException branch.
// Fix: ask child directly. child (e.g. SortMergeJoinExec) extends BlockingOperatorWithCodegen
// which returns false, so no recursion.
override def needCopyResult: Boolean = child match {
case c: CodegenSupport => c.needCopyResult
case _ => false
}
override def needCopyResult: Boolean = child.asInstanceOf[CodegenSupport].needCopyResult

// needStopCheck flows UP: do NOT override — the default (parent.needStopCheck) is correct.
// Delegating to child.needStopCheck instead creates an infinite loop because
// child.parent == TimedExec (set by doProduce's produce(ctx, this)), so:
// TimedExec.needStopCheck → child.needStopCheck → child.parent.needStopCheck
// → TimedExec.needStopCheck → ...
/**
* Wrap child.produce() with nanoTime deltas.
*
* Measures cumulative time of the full pipeline below this node.
* Exclusive attribution is done post-hoc: exclusive(N) = D(N) - D(pipelined_child).
*
* The accumulated nanos are flushed to the SQLMetric after each produce invocation.
* For pipelined children this happens per-row; for blocking children once after the
* full build phase.
*
* Uses try/finally so the timing flushes even when blocking operators (SortExec, etc.)
* exit early via shouldStop()/return — without this, duration would be 0.
*/
override protected def doProduce(ctx: CodegenContext): String = {
// CodegenSupport.variablePrefix is private and falls through to nodeName.toLowerCase
// for unknown types (like TimedExec). Nodes with spaces in nodeName (e.g. RDDScanExec's
// "Scan ExistingRDD") produce invalid Java identifiers. Sanitize the prefix here.
ctx.freshNamePrefix = ctx.freshNamePrefix.replaceAll("[^a-zA-Z0-9_]", "")
val durationTerm = metricTerm(ctx, "duration")
val startTime = ctx.freshName("timedExecStart")
val accumulated = ctx.addMutableState("long", ctx.freshName("timedExecAccNs"),
v => s"$v = 0L;")
val childCode = child.asInstanceOf[CodegenSupport].produce(ctx, this)
// Test-only: inject a per-partition sleep into the generated code so tests can verify
// that the codegen timing path actually captures wall-clock time.
// Uses a mutable flag so the sleep fires once per partition (not every processNext() call).
val sleepMs = sparkContext.conf.getLong("spark.dataflint.test.codegenSleepMs", 0L)
val sleepCode = if (sleepMs > 0) {
val sleepDone = ctx.addMutableState("boolean", ctx.freshName("timedExecSleepDone"),
Expand All @@ -206,13 +180,18 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with
""".stripMargin
}

// Fully transparent — no per-row logic, just forward to parent
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String =
consume(ctx, input)

override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
TimedExec(child.withNewChildren(newChildren))
}

object TimedExec {
def apply(child: SparkPlan): TimedExec = new TimedExec(child)
def apply(child: SparkPlan): TimedExec = child match {
case _: CodegenSupport => new TimedWithCodegenExec(child)
case _ => new TimedExec(child)
}

/**
* A minimal SparkPlan that wraps execute() with per-partition duration timing.
Expand Down
Loading