diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala index 2106a9e..3a2d5a4 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala @@ -1,5 +1,6 @@ package org.apache.spark.dataflint +import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.metric.SQLMetric @@ -7,36 +8,44 @@ import org.apache.spark.sql.vectorized.ColumnarBatch object DataFlintRDDUtils { def withDurationMetric(rdd: RDD[InternalRow], durationMetric: SQLMetric): RDD[InternalRow] = - rdd.mapPartitions { iter => - val startTime = System.nanoTime() - var done = false - new Iterator[InternalRow] { - override def hasNext: Boolean = { - val r = iter.hasNext - if (!r && !done) { - durationMetric += ((System.nanoTime() - startTime)/(1000 * 1000)) - done = true + new RDD[InternalRow](rdd) { + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val startTime = System.nanoTime() + val iter = firstParent[InternalRow].iterator(split, context) + var done = false + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += (System.nanoTime() - startTime) / (1000 * 1000) + done = true + } + r } - r + override def next(): InternalRow = iter.next() } - override def next(): InternalRow = iter.next() } + override protected def getPartitions: Array[Partition] = firstParent.partitions } def withDurationMetricColumnar(rdd: RDD[ColumnarBatch], durationMetric: SQLMetric): RDD[ColumnarBatch] = - rdd.mapPartitions { iter => - val startTime = System.nanoTime() - var done = false - new Iterator[ColumnarBatch] { - override def hasNext: Boolean = { - val r = iter.hasNext - if (!r && !done) { - durationMetric += ((System.nanoTime() - startTime) / (1000 * 1000)) - done = true + new RDD[ColumnarBatch](rdd) { + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + val startTime = System.nanoTime() + val iter = firstParent[ColumnarBatch].iterator(split, context) + var done = false + new Iterator[ColumnarBatch] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += (System.nanoTime() - startTime) / (1000 * 1000) + done = true + } + r } - r + override def next(): ColumnarBatch = iter.next() } - override def next(): ColumnarBatch = iter.next() } + override protected def getPartitions: Array[Partition] = firstParent.partitions } } \ No newline at end of file diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/SlowSumAggregator.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/SlowSumAggregator.scala index 9defd45..773a3d6 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/SlowSumAggregator.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/SlowSumAggregator.scala @@ -16,7 +16,7 @@ class SlowSumAggregator extends Aggregator[Double, Double, Double] { private lazy val sleepMs: Long = { try { val sc = org.apache.spark.SparkContext.getActive - sc.map(_.conf.getLong("spark.dataflint.test.slowSumSleepMs", 1L)).getOrElse(1L) + sc.map(_.conf.getLong("spark.dataflint.test.slowSumSleepMs", 1L)).getOrElse(0L) } catch { case _: Throwable => 1L } diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala index 0f04f32..7ecf5b9 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala @@ -80,17 +80,36 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with // Write path: DataWritingCommandExec does work eagerly in sideEffectResult (a lazy val), // triggered by executeCollect(). doExecute()/withDurationMetric only wraps the trivial - // result RDD and misses the actual I/O. This override captures the full write duration. - // For non-write nodes this is harmless — executeCollect() on TimedExec is only called - // when TimedExec is the plan root (writes), not when it's inside WSCE (codegen path). + // result RDD and misses the actual I/O. + // + // To get per-partition timing (consistent with codegen/RDD paths), we reconstruct + // DataWritingCommandExec with the data-producing plan wrapped in an RDDTimingWrapper. + // The write command consumes this timed RDD via sparkContext.runJob, so the wall-clock- + // per-partition timing captures both data production AND write I/O (writes happen between + // next() calls on the timed iterator). + // + // Spark 3.4+ inserts WriteFilesExec between DataWritingCommandExec and the data plan. + // cmd.run() type-checks for WriteFilesExec, so we wrap the data plan INSIDE WriteFilesExec + // (one level deeper), preserving the type check. On older Spark we wrap the child directly. override def executeCollect(): Array[InternalRow] = { - val start = System.nanoTime() - val result = child.executeCollect() - val durationMs = (System.nanoTime() - start) / (1000 * 1000) - val durationMetric = longMetric("duration") - durationMetric += durationMs - MetricsUtils.postDriverMetrics(sparkContext, durationMetric) - result + if (child.getClass.getSimpleName == "DataWritingCommandExec") { + val durationMetric = longMetric("duration") + val innerChild = child.children.head + val wrappedChild = if (innerChild.getClass.getSimpleName == "WriteFilesExec") { + // Spark 3.4+: wrap the data plan inside WriteFilesExec + val dataPlan = innerChild.children.head + val timedDataPlan = new TimedExec.RDDTimingWrapper(dataPlan, durationMetric) + val wrappedWriteFiles = innerChild.withNewChildren(IndexedSeq(timedDataPlan)) + child.withNewChildren(IndexedSeq(wrappedWriteFiles)) + } else { + // Older Spark: wrap the data plan directly + val timedDataPlan = new TimedExec.RDDTimingWrapper(innerChild, durationMetric) + child.withNewChildren(IndexedSeq(timedDataPlan)) + } + wrappedChild.executeCollect() + } else { + super.executeCollect() + } } override def canEqual(that: Any): Boolean = that.isInstanceOf[TimedExec] @@ -147,9 +166,16 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with * * The accumulated nanos are flushed to the SQLMetric after each produce invocation. * For pipelined children this happens per-row; for blocking children once after the - * full build phase — both are correct. + * full build phase. + * + * Uses try/finally so the timing flushes even when blocking operators (SortExec, etc.) + * exit early via shouldStop()/return — without this, duration would be 0. */ override protected def doProduce(ctx: CodegenContext): String = { + // CodegenSupport.variablePrefix is private and falls through to nodeName.toLowerCase + // for unknown types (like TimedExec). Nodes with spaces in nodeName (e.g. RDDScanExec's + // "Scan ExistingRDD") produce invalid Java identifiers. Sanitize the prefix here. + ctx.freshNamePrefix = ctx.freshNamePrefix.replaceAll("[^a-zA-Z0-9_]", "") val durationTerm = metricTerm(ctx, "duration") val startTime = ctx.freshName("timedExecStart") val accumulated = ctx.addMutableState("long", ctx.freshName("timedExecAccNs"), @@ -157,15 +183,26 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with val childCode = child.asInstanceOf[CodegenSupport].produce(ctx, this) // Test-only: inject a per-partition sleep into the generated code so tests can verify // that the codegen timing path actually captures wall-clock time. + // Uses a mutable flag so the sleep fires once per partition (not every processNext() call). val sleepMs = sparkContext.conf.getLong("spark.dataflint.test.codegenSleepMs", 0L) - val sleepCode = if (sleepMs > 0) s"try { Thread.sleep(${sleepMs}L); } catch (InterruptedException e) { }" else "" + val sleepCode = if (sleepMs > 0) { + val sleepDone = ctx.addMutableState("boolean", ctx.freshName("timedExecSleepDone"), + v => s"$v = false;") + s"""if (!$sleepDone) { + | try { Thread.sleep(${sleepMs}L); } catch (InterruptedException e) { } + | $sleepDone = true; + |}""".stripMargin + } else "" s""" |long $startTime = System.nanoTime(); |$sleepCode - |$childCode - |$accumulated += System.nanoTime() - $startTime; - |$durationTerm.add($accumulated / $NANOS_PER_MILLIS); - |$accumulated = 0L; + |try { + | $childCode + |} finally { + | $accumulated += System.nanoTime() - $startTime; + | $durationTerm.add($accumulated / $NANOS_PER_MILLIS); + | $accumulated = 0L; + |} """.stripMargin } @@ -176,4 +213,30 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with object TimedExec { def apply(child: SparkPlan): TimedExec = new TimedExec(child) + + /** + * A minimal SparkPlan that wraps execute() with per-partition duration timing. + * Used by the write path: inserted inside WriteFilesExec (or as the direct child on older + * Spark) so that the write command consumes a timed RDD per partition. + */ + private[dataflint] class RDDTimingWrapper(val child: SparkPlan, durationMetric: SQLMetric) extends SparkPlan { + override def output: Seq[Attribute] = child.output + override def children: Seq[SparkPlan] = Seq(child) + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def supportsColumnar: Boolean = child.supportsColumnar + + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(child.execute(), durationMetric) + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = + DataFlintRDDUtils.withDurationMetricColumnar(child.executeColumnar(), durationMetric) + + override def productArity: Int = 1 + override def productElement(n: Int): Any = + if (n == 0) child else throw new IndexOutOfBoundsException(s"$n") + override def canEqual(that: Any): Boolean = that.isInstanceOf[RDDTimingWrapper] + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = + new RDDTimingWrapper(newChildren.head, durationMetric) + } } \ No newline at end of file diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index f8eb04a..1d6e2ea 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -44,7 +44,7 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C "SortMergeJoinExec", "BroadcastHashJoinExec", "BroadcastNestedLoopJoinExec", "CartesianProductExec", "WindowGroupLimitExec", "SortAggregateExec", "SortExec", "HashAggregateExec", "DataWritingCommandExec", - "FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", + "FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", "RDDScanExec", ) val all = Set( "BatchEvalPythonExec", diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index 1473091..a97ebde 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -36,7 +36,7 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C "SortMergeJoinExec", "BroadcastHashJoinExec", "BroadcastNestedLoopJoinExec", "CartesianProductExec", "WindowGroupLimitExec", "SortAggregateExec", "DataWritingCommandExec", - "FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", + "FileSourceScanExec", "RowDataSourceScanExec", "BatchScanExec", "RDDScanExec", ) val all = Set( "BatchEvalPythonExec", diff --git a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py index 2ea8424..63fe59e 100755 --- a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py +++ b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py @@ -64,7 +64,7 @@ def sleep(seconds): .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \ .config("spark.ui.port", "10000") \ .config("spark.sql.maxMetadataStringLength", "10000") \ - .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.enabled", "false") \ .config("spark.dataflint.telemetry.enabled", "false") \ .config("spark.dataflint.instrument.spark.mapInPandas.enabled", instrument) \ .config("spark.dataflint.instrument.spark.mapInArrow.enabled", instrument) \