From 8bb4d5ec0a2a16fee536c86e050e1f71fb3e64a5 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Thu, 23 Apr 2026 19:46:48 +0300 Subject: [PATCH 1/2] Fix history server support for older Spark apps (pre-3.2) - Use non-paginated /sql endpoint for Spark < 3.2 apps where the paginated API returns 404 - Handle per-execution planGraph failures in DataflintSQLPlanPage instead of failing the entire batch (fixes NoSuchElementException for apps missing plan data in KVStore) - Fix catch block return type from JObject to JArray in both Spark 3 and Spark 4 SQLPlanPage - Guard against non-array sqlplan response in frontend Co-Authored-By: Claude Opus 4.6 (1M context) --- .../dataflint/api/DataflintSQLPlanPage.scala | 34 +++++++++++-------- .../dataflint/api/DataflintSQLPlanPage.scala | 34 +++++++++++-------- spark-ui/src/services/SparkApi.tsx | 14 +++++++- 3 files changed, 51 insertions(+), 31 deletions(-) diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala index 6c6dcd16..b9927c48 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala @@ -59,22 +59,26 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt) - val sqlPlans = executionList.map { exec => - val graph = if (isDatabricks) { - val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] - planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] - } else - sqlStore.planGraph(exec.executionId) + val sqlPlans = executionList.flatMap { exec => + try { + val graph = if (isDatabricks) { + val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] + planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] + } else + sqlStore.planGraph(exec.executionId) - val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None + val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None - val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) - SqlEnrichedData(exec.executionId, getRootExecutionId(exec, exec.executionId), graph.allNodes.length, rddScopesToStages, - graph.allNodes.map(node => { - val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) - NodePlan(node.id, node.desc, rddScopeId) - }).toSeq - ) + val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) + Some(SqlEnrichedData(exec.executionId, getRootExecutionId(exec, exec.executionId), graph.allNodes.length, rddScopesToStages, + graph.allNodes.map(node => { + val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) + NodePlan(node.id, node.desc, rddScopeId) + }).toSeq + )) + } catch { + case _: Throwable => None + } } val jsonValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats) jsonValue @@ -83,7 +87,7 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe catch { case e: Throwable => { logError("failed to serve dataflint SQL metrics", e) - JObject() + JArray(List()) } } } diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala index 3d0ad940..3dc63ad0 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala @@ -34,22 +34,26 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt) - val sqlPlans = executionList.map { exec => - val graph = if (isDatabricks) { - val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] - planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] - } else - sqlStore.planGraph(exec.executionId) + val sqlPlans = executionList.flatMap { exec => + try { + val graph = if (isDatabricks) { + val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] + planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] + } else + sqlStore.planGraph(exec.executionId) - val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None + val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None - val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) - SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages, - graph.allNodes.map(node => { - val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) - NodePlan(node.id, node.desc, rddScopeId) - }).toSeq - ) + val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) + Some(SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages, + graph.allNodes.map(node => { + val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) + NodePlan(node.id, node.desc, rddScopeId) + }).toSeq + )) + } catch { + case _: Throwable => None + } } val jsonValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats) jsonValue @@ -58,7 +62,7 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe catch { case e: Throwable => { logError("failed to serve dataflint SQL metrics", e) - JObject() + JArray(List()) } } } diff --git a/spark-ui/src/services/SparkApi.tsx b/spark-ui/src/services/SparkApi.tsx index 66f20fe5..6942451f 100644 --- a/spark-ui/src/services/SparkApi.tsx +++ b/spark-ui/src/services/SparkApi.tsx @@ -66,6 +66,8 @@ class SparkAPI { historyServerMode: boolean = false; icebergEnabled: boolean = false; sparkVersion: string | undefined = undefined; + // When true, use non-paginated /sql endpoint (for older Spark apps where paginated API returns 404) + useLegacySqlApi: boolean = false; // Cache for response length+hash to skip processing when data hasn't changed // Length is checked first (O(1)), hash only calculated if lengths match @@ -110,6 +112,9 @@ class SparkAPI { } private buildSqlPath(offset: number): string { + if (this.useLegacySqlApi) { + return `${this.applicationPath}/sql`; + } return `${this.applicationPath}/sql?offset=${offset}&length=${SQL_QUERY_LENGTH}&planDescription=false`; } @@ -312,6 +317,12 @@ class SparkAPI { ? currentAttempt.attemptId : undefined; this.sparkVersion = currentAttempt?.appSparkVersion; + // Paginated SQL API (/sql?offset=&length=) was added in Spark 3.2+ + // Older apps on the history server need the non-paginated /sql endpoint + if (this.sparkVersion) { + const [major, minor] = this.sparkVersion.split(".").map(Number); + this.useLegacySqlApi = major < 3 || (major === 3 && minor < 2); + } const sparkConfiguration: SparkConfiguration = await this.queryData( this.environmentPath, ); @@ -396,9 +407,10 @@ class SparkAPI { this.buildSqlPath(sqlIdToQueryFrom), true ); - const sparkPlans: SQLPlans = await this.queryData( + const sparkPlansResponse = await this.queryData( this.buildSqlPlanPath(sqlIdToQueryFrom) ); + const sparkPlans: SQLPlans = Array.isArray(sparkPlansResponse) ? sparkPlansResponse : []; let icebergInfo: IcebergInfo = { commitsInfo: [] }; if (this.icebergEnabled) { icebergInfo = await this.queryData( From 8b9d361c5ecda5c0a495f0301ac5e80d81bb795e Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Thu, 23 Apr 2026 20:58:56 +0300 Subject: [PATCH 2/2] split TimedExec to 2 classes whether they have CodeGen or not. seems like there is a bug in spark<3.2 where even if supportCodegen method returns false then just since class inherits from codegen then it causes spark to be confused. so now only if original exec exterds Codegen it will get TimedWithCodegenExec --- .../apache/spark/dataflint/TimedExec.scala | 81 +++++++------------ 1 file changed, 30 insertions(+), 51 deletions(-) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala index 7ecf5b90..d6cc9bc7 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala @@ -33,8 +33,15 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * Extends SparkPlan directly (not UnaryExecNode) to avoid a `UnaryLike` reference in * compiled bytecode, which would cause NoClassDefFoundError on Spark 3.0/3.1. * productElement/productArity support `makeCopy` on Spark 3.0/3.1. + * + * ## Codegen + * `TimedExec` does NOT implement `CodegenSupport`. For nodes that support codegen, + * `TimedWithCodegenExec` extends this class and adds codegen support. This avoids + * ClassCastExceptions on Spark 3.0/3.1 where some nodes (e.g. FileSourceScanExec) + * do not implement `CodegenSupport`. + * Use `TimedExec.apply()` to automatically pick the right variant. */ -class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with Logging { +class TimedExec(val child: SparkPlan) extends SparkPlan with Logging { override def nodeName: String = "DataFlint" + child.nodeName override def output: Seq[Attribute] = child.output @@ -60,7 +67,7 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with // recursive call on the grandchildren is safe. override protected def doPrepare(): Unit = child.prepare() - private def postRddId(rddId: Int): Unit = { + protected def postRddId(rddId: Int): Unit = { val rddIdMetric = longMetric("rddId") rddIdMetric += rddId MetricsUtils.postDriverMetrics(sparkContext, rddIdMetric) @@ -118,72 +125,39 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with override def productElement(n: Int): Any = if (n == 0) child else throw new IndexOutOfBoundsException(s"$n") - // When Spark updates our children (= child's children), rebuild child with new children - // and wrap in a new TimedExec. Used by Spark 3.2+ plan transformations (AQE, CollapseCodegen, etc.) + // When Spark updates our children (= child's children), rebuild child with new children. + // Uses TimedExec.apply to pick the right variant (with or without codegen). override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = - new TimedExec(child.withNewChildren(newChildren)) + TimedExec(child.withNewChildren(newChildren)) +} - // ----------------------------------------------------------------------------------------- - // Codegen execution path - // ----------------------------------------------------------------------------------------- +/** + * TimedExec variant for nodes that implement CodegenSupport. + * Adds codegen timing by wrapping child.produce() with nanoTime deltas. + * Only instantiated via TimedExec.apply() when child is CodegenSupport. + */ +class TimedWithCodegenExec(override val child: SparkPlan) extends TimedExec(child) with CodegenSupport { override def inputRDDs(): Seq[RDD[InternalRow]] = { val rdds = child.asInstanceOf[CodegenSupport].inputRDDs() - // Codegen path: doExecute() is not called, so post rddId here. - // inputRDDs() is called during WSCE code generation on the driver. rdds.headOption.foreach(rdd => postRddId(rdd.id)) rdds } - // Delegate supportCodegen to child — must use child.supportCodegen, not just isInstanceOf. - // Example: SortAggregateExec implements CodegenSupport but returns supportCodegen=false - // when grouping keys are present (doProduce throws UnsupportedOperationException in that case). - override def supportCodegen: Boolean = child match { - case c: CodegenSupport => c.supportCodegen && child.children.length <= 1 - case _ => false + override def supportCodegen: Boolean = { + val c = child.asInstanceOf[CodegenSupport] + c.supportCodegen && child.children.length <= 1 } - // needCopyResult flows DOWN: the default asks children.head.needCopyResult. - // With the transparent wrapper, children = child.children. For multi-child nodes (joins), - // children.length > 1, which hits the default's UnsupportedOperationException branch. - // Fix: ask child directly. child (e.g. SortMergeJoinExec) extends BlockingOperatorWithCodegen - // which returns false, so no recursion. - override def needCopyResult: Boolean = child match { - case c: CodegenSupport => c.needCopyResult - case _ => false - } + override def needCopyResult: Boolean = child.asInstanceOf[CodegenSupport].needCopyResult - // needStopCheck flows UP: do NOT override — the default (parent.needStopCheck) is correct. - // Delegating to child.needStopCheck instead creates an infinite loop because - // child.parent == TimedExec (set by doProduce's produce(ctx, this)), so: - // TimedExec.needStopCheck → child.needStopCheck → child.parent.needStopCheck - // → TimedExec.needStopCheck → ... - /** - * Wrap child.produce() with nanoTime deltas. - * - * Measures cumulative time of the full pipeline below this node. - * Exclusive attribution is done post-hoc: exclusive(N) = D(N) - D(pipelined_child). - * - * The accumulated nanos are flushed to the SQLMetric after each produce invocation. - * For pipelined children this happens per-row; for blocking children once after the - * full build phase. - * - * Uses try/finally so the timing flushes even when blocking operators (SortExec, etc.) - * exit early via shouldStop()/return — without this, duration would be 0. - */ override protected def doProduce(ctx: CodegenContext): String = { - // CodegenSupport.variablePrefix is private and falls through to nodeName.toLowerCase - // for unknown types (like TimedExec). Nodes with spaces in nodeName (e.g. RDDScanExec's - // "Scan ExistingRDD") produce invalid Java identifiers. Sanitize the prefix here. ctx.freshNamePrefix = ctx.freshNamePrefix.replaceAll("[^a-zA-Z0-9_]", "") val durationTerm = metricTerm(ctx, "duration") val startTime = ctx.freshName("timedExecStart") val accumulated = ctx.addMutableState("long", ctx.freshName("timedExecAccNs"), v => s"$v = 0L;") val childCode = child.asInstanceOf[CodegenSupport].produce(ctx, this) - // Test-only: inject a per-partition sleep into the generated code so tests can verify - // that the codegen timing path actually captures wall-clock time. - // Uses a mutable flag so the sleep fires once per partition (not every processNext() call). val sleepMs = sparkContext.conf.getLong("spark.dataflint.test.codegenSleepMs", 0L) val sleepCode = if (sleepMs > 0) { val sleepDone = ctx.addMutableState("boolean", ctx.freshName("timedExecSleepDone"), @@ -206,13 +180,18 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with CodegenSupport with """.stripMargin } - // Fully transparent — no per-row logic, just forward to parent override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = consume(ctx, input) + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = + TimedExec(child.withNewChildren(newChildren)) } object TimedExec { - def apply(child: SparkPlan): TimedExec = new TimedExec(child) + def apply(child: SparkPlan): TimedExec = child match { + case _: CodegenSupport => new TimedWithCodegenExec(child) + case _ => new TimedExec(child) + } /** * A minimal SparkPlan that wraps execute() with per-partition duration timing.