Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,42 +1,51 @@
package org.apache.spark.dataflint

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch

object DataFlintRDDUtils {
def withDurationMetric(rdd: RDD[InternalRow], durationMetric: SQLMetric): RDD[InternalRow] =
rdd.mapPartitions { iter =>
val startTime = System.nanoTime()
var done = false
new Iterator[InternalRow] {
override def hasNext: Boolean = {
val r = iter.hasNext
if (!r && !done) {
durationMetric += ((System.nanoTime() - startTime)/(1000 * 1000))
done = true
new RDD[InternalRow](rdd) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val startTime = System.nanoTime()
val iter = firstParent[InternalRow].iterator(split, context)
var done = false
new Iterator[InternalRow] {
override def hasNext: Boolean = {
val r = iter.hasNext
if (!r && !done) {
durationMetric += (System.nanoTime() - startTime) / (1000 * 1000)
done = true
}
r
}
r
override def next(): InternalRow = iter.next()
}
override def next(): InternalRow = iter.next()
}
override protected def getPartitions: Array[Partition] = firstParent.partitions
}

def withDurationMetricColumnar(rdd: RDD[ColumnarBatch], durationMetric: SQLMetric): RDD[ColumnarBatch] =
rdd.mapPartitions { iter =>
val startTime = System.nanoTime()
var done = false
new Iterator[ColumnarBatch] {
override def hasNext: Boolean = {
val r = iter.hasNext
if (!r && !done) {
durationMetric += ((System.nanoTime() - startTime) / (1000 * 1000))
done = true
new RDD[ColumnarBatch](rdd) {
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
val startTime = System.nanoTime()
val iter = firstParent[ColumnarBatch].iterator(split, context)
var done = false
new Iterator[ColumnarBatch] {
override def hasNext: Boolean = {
val r = iter.hasNext
if (!r && !done) {
durationMetric += (System.nanoTime() - startTime) / (1000 * 1000)
done = true
}
r
}
r
override def next(): ColumnarBatch = iter.next()
}
override def next(): ColumnarBatch = iter.next()
}
override protected def getPartitions: Array[Partition] = firstParent.partitions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class SlowSumAggregator extends Aggregator[Double, Double, Double] {
private lazy val sleepMs: Long = {
try {
val sc = org.apache.spark.SparkContext.getActive
sc.map(_.conf.getLong("spark.dataflint.test.slowSumSleepMs", 1L)).getOrElse(1L)
sc.map(_.conf.getLong("spark.dataflint.test.slowSumSleepMs", 1L)).getOrElse(0L)
} catch {
case _: Throwable => 1L
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,36 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with

// Write path: DataWritingCommandExec does work eagerly in sideEffectResult (a lazy val),
// triggered by executeCollect(). doExecute()/withDurationMetric only wraps the trivial
// result RDD and misses the actual I/O. This override captures the full write duration.
// For non-write nodes this is harmless — executeCollect() on TimedExec is only called
// when TimedExec is the plan root (writes), not when it's inside WSCE (codegen path).
// result RDD and misses the actual I/O.
//
// To get per-partition timing (consistent with codegen/RDD paths), we reconstruct
// DataWritingCommandExec with the data-producing plan wrapped in an RDDTimingWrapper.
// The write command consumes this timed RDD via sparkContext.runJob, so the wall-clock-
// per-partition timing captures both data production AND write I/O (writes happen between
// next() calls on the timed iterator).
//
// Spark 3.4+ inserts WriteFilesExec between DataWritingCommandExec and the data plan.
// cmd.run() type-checks for WriteFilesExec, so we wrap the data plan INSIDE WriteFilesExec
// (one level deeper), preserving the type check. On older Spark we wrap the child directly.
override def executeCollect(): Array[InternalRow] = {
val start = System.nanoTime()
val result = child.executeCollect()
val durationMs = (System.nanoTime() - start) / (1000 * 1000)
val durationMetric = longMetric("duration")
durationMetric += durationMs
MetricsUtils.postDriverMetrics(sparkContext, durationMetric)
result
if (child.getClass.getSimpleName == "DataWritingCommandExec") {
val durationMetric = longMetric("duration")
val innerChild = child.children.head
val wrappedChild = if (innerChild.getClass.getSimpleName == "WriteFilesExec") {
// Spark 3.4+: wrap the data plan inside WriteFilesExec
val dataPlan = innerChild.children.head
val timedDataPlan = new TimedExec.RDDTimingWrapper(dataPlan, durationMetric)
val wrappedWriteFiles = innerChild.withNewChildren(IndexedSeq(timedDataPlan))
child.withNewChildren(IndexedSeq(wrappedWriteFiles))
} else {
// Older Spark: wrap the data plan directly
val timedDataPlan = new TimedExec.RDDTimingWrapper(innerChild, durationMetric)
child.withNewChildren(IndexedSeq(timedDataPlan))
}
wrappedChild.executeCollect()
} else {
super.executeCollect()
}
}

override def canEqual(that: Any): Boolean = that.isInstanceOf[TimedExec]
Expand Down Expand Up @@ -147,25 +166,43 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with
*
* The accumulated nanos are flushed to the SQLMetric after each produce invocation.
* For pipelined children this happens per-row; for blocking children once after the
* full build phase — both are correct.
* full build phase.
*
* Uses try/finally so the timing flushes even when blocking operators (SortExec, etc.)
* exit early via shouldStop()/return — without this, duration would be 0.
*/
override protected def doProduce(ctx: CodegenContext): String = {
// CodegenSupport.variablePrefix is private and falls through to nodeName.toLowerCase
// for unknown types (like TimedExec). Nodes with spaces in nodeName (e.g. RDDScanExec's
// "Scan ExistingRDD") produce invalid Java identifiers. Sanitize the prefix here.
ctx.freshNamePrefix = ctx.freshNamePrefix.replaceAll("[^a-zA-Z0-9_]", "")
val durationTerm = metricTerm(ctx, "duration")
val startTime = ctx.freshName("timedExecStart")
val accumulated = ctx.addMutableState("long", ctx.freshName("timedExecAccNs"),
v => s"$v = 0L;")
val childCode = child.asInstanceOf[CodegenSupport].produce(ctx, this)
// Test-only: inject a per-partition sleep into the generated code so tests can verify
// that the codegen timing path actually captures wall-clock time.
// Uses a mutable flag so the sleep fires once per partition (not every processNext() call).
val sleepMs = sparkContext.conf.getLong("spark.dataflint.test.codegenSleepMs", 0L)
val sleepCode = if (sleepMs > 0) s"try { Thread.sleep(${sleepMs}L); } catch (InterruptedException e) { }" else ""
val sleepCode = if (sleepMs > 0) {
val sleepDone = ctx.addMutableState("boolean", ctx.freshName("timedExecSleepDone"),
v => s"$v = false;")
s"""if (!$sleepDone) {
| try { Thread.sleep(${sleepMs}L); } catch (InterruptedException e) { }
| $sleepDone = true;
|}""".stripMargin
} else ""
s"""
|long $startTime = System.nanoTime();
|$sleepCode
|$childCode
|$accumulated += System.nanoTime() - $startTime;
|$durationTerm.add($accumulated / $NANOS_PER_MILLIS);
|$accumulated = 0L;
|try {
| $childCode
|} finally {
| $accumulated += System.nanoTime() - $startTime;
| $durationTerm.add($accumulated / $NANOS_PER_MILLIS);
| $accumulated = 0L;
|}
""".stripMargin
}

Expand All @@ -176,4 +213,30 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with

object TimedExec {
def apply(child: SparkPlan): TimedExec = new TimedExec(child)

/**
* A minimal SparkPlan that wraps execute() with per-partition duration timing.
* Used by the write path: inserted inside WriteFilesExec (or as the direct child on older
* Spark) so that the write command consumes a timed RDD per partition.
*/
private[dataflint] class RDDTimingWrapper(val child: SparkPlan, durationMetric: SQLMetric) extends SparkPlan {
override def output: Seq[Attribute] = child.output
override def children: Seq[SparkPlan] = Seq(child)
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def supportsColumnar: Boolean = child.supportsColumnar

override protected def doExecute(): RDD[InternalRow] =
DataFlintRDDUtils.withDurationMetric(child.execute(), durationMetric)

override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
DataFlintRDDUtils.withDurationMetricColumnar(child.executeColumnar(), durationMetric)

override def productArity: Int = 1
override def productElement(n: Int): Any =
if (n == 0) child else throw new IndexOutOfBoundsException(s"$n")
override def canEqual(that: Any): Boolean = that.isInstanceOf[RDDTimingWrapper]
override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
new RDDTimingWrapper(newChildren.head, durationMetric)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C
"SortMergeJoinExec", "BroadcastHashJoinExec", "BroadcastNestedLoopJoinExec",
"CartesianProductExec", "WindowGroupLimitExec", "SortAggregateExec", "SortExec", "HashAggregateExec",
"DataWritingCommandExec",
"FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec",
"FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", "RDDScanExec",
)
val all = Set(
"BatchEvalPythonExec",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C
"SortMergeJoinExec", "BroadcastHashJoinExec", "BroadcastNestedLoopJoinExec",
"CartesianProductExec", "WindowGroupLimitExec", "SortAggregateExec",
"DataWritingCommandExec",
"FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec",
"FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", "RDDScanExec",
)
val all = Set(
"BatchEvalPythonExec",
Expand Down
2 changes: 1 addition & 1 deletion spark-plugin/pyspark-testing/dataflint_pyspark_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def sleep(seconds):
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
.config("spark.ui.port", "10000") \
.config("spark.sql.maxMetadataStringLength", "10000") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.enabled", "false") \
.config("spark.dataflint.telemetry.enabled", "false") \
.config("spark.dataflint.instrument.spark.mapInPandas.enabled", instrument) \
.config("spark.dataflint.instrument.spark.mapInArrow.enabled", instrument) \
Expand Down
Loading