diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala index 58cbdd23..d04fe92e 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala @@ -119,13 +119,22 @@ class DataflintSparkUICommonInstaller extends Logging { pageFactory.addStaticHandler(ui, "io/dataflint/spark/static/ui", ui.basePath + "/dataflint") val dataflintStore = new DataflintStore(store = ui.store.store) val tab = pageFactory.createDataFlintTab(ui) - tab.attachPage(pageFactory.createSQLPlanPage(ui, dataflintStore, sqlListener)) - tab.attachPage(pageFactory.createSQLMetricsPage(ui, sqlListener)) - tab.attachPage(pageFactory.createSQLStagesRddPage(ui)) - tab.attachPage(pageFactory.createApplicationInfoPage(ui, dataflintStore)) - tab.attachPage(pageFactory.createIcebergPage(ui, dataflintStore)) - tab.attachPage(pageFactory.createDeltaLakeScanPage(ui, dataflintStore)) - tab.attachPage(pageFactory.createCachedStoragePage(ui, dataflintStore)) + if (pageFactory.usesReflectiveEndpoints) { + // Spark 4 path: bypass WebUIPage entirely and serve JSON via reflective Jetty + // ServletContextHandlers. Required for runtimes like Databricks Runtime 17.3 + // that ship javax.servlet instead of jakarta.servlet — a WebUIPage subclass + // with jakarta-typed render/renderJson would fail JVM verification there. + pageFactory.attachReflectiveEndpoints(ui, dataflintStore, sqlListener) + } else { + // Spark 3 path: attach WebUIPage instances to the DataFlint tab in the usual way. + tab.attachPage(pageFactory.createSQLPlanPage(ui, dataflintStore, sqlListener)) + tab.attachPage(pageFactory.createSQLMetricsPage(ui, sqlListener)) + tab.attachPage(pageFactory.createSQLStagesRddPage(ui)) + tab.attachPage(pageFactory.createApplicationInfoPage(ui, dataflintStore)) + tab.attachPage(pageFactory.createIcebergPage(ui, dataflintStore)) + tab.attachPage(pageFactory.createDeltaLakeScanPage(ui, dataflintStore)) + tab.attachPage(pageFactory.createCachedStoragePage(ui, dataflintStore)) + } ui.attachTab(tab) ui.webUrl } diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintPageFactory.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintPageFactory.scala index bc93b028..76d216b9 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintPageFactory.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/api/DataflintPageFactory.scala @@ -7,23 +7,31 @@ import org.apache.spark.ui.{SparkUI, WebUIPage, WebUITab} /** * Abstract factory for creating Dataflint UI components. * This allows version-specific implementations for different Spark versions. + * + * Spark 3 implementations build a `DataFlintTab` and attach `WebUIPage` instances + * to it (see [[createApplicationInfoPage]] etc.). Spark 4 cannot use that flow + * because some target runtimes (e.g. Databricks Runtime 17.3) ship javax.servlet + * instead of jakarta.servlet, which breaks JVM verification of any class that + * extends WebUIPage with jakarta-typed `render`/`renderJson`. The Spark 4 path + * therefore overrides [[usesReflectiveEndpoints]] to `true` and supplies all + * endpoints through [[attachReflectiveEndpoints]] instead. */ abstract class DataflintPageFactory { - + def createDataFlintTab(ui: SparkUI): WebUITab - + def createApplicationInfoPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage - + def createCachedStoragePage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage - + def createIcebergPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage - + def createDeltaLakeScanPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage - + def createSQLMetricsPage(ui: SparkUI, sqlListener: () => Option[SQLAppStatusListener]): WebUIPage - + def createSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListener: () => Option[SQLAppStatusListener]): WebUIPage - + def createSQLStagesRddPage(ui: SparkUI): WebUIPage def addStaticHandler(ui: SparkUI, resourceBase: String, contextPath: String): Unit @@ -31,4 +39,20 @@ abstract class DataflintPageFactory { def getTabs(ui: SparkUI): Seq[WebUITab] def isUISupported(ui: SparkUI): Boolean = true -} + + /** + * If true, the common loader will skip the per-page `attachPage` calls and + * rely entirely on [[attachReflectiveEndpoints]] to wire the JSON API. Used + * by Spark 4 to bypass `WebUIPage` (which is jakarta.servlet-typed and breaks + * on javax-only runtimes like Databricks Runtime 17.3). + */ + def usesReflectiveEndpoints: Boolean = false + + /** + * Attach DataFlint JSON endpoints reflectively. Default no-op; overridden by + * Spark 4 to install a Proxy-backed servlet for each endpoint. + */ + def attachReflectiveEndpoints(ui: SparkUI, + dataflintStore: DataflintStore, + sqlListener: () => Option[SQLAppStatusListener]): Unit = () +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataFlintTab.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataFlintTab.scala index 582b9d74..17f3afc9 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataFlintTab.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataFlintTab.scala @@ -1,16 +1,7 @@ package org.apache.spark.dataflint.api -import org.apache.spark.ui.{SparkUI, UIUtils, WebUITab} +import org.apache.spark.ui.{SparkUI, WebUITab} -import jakarta.servlet.http.HttpServletRequest -import scala.xml.Node - -class DataFlintTab(parent: SparkUI) extends WebUITab(parent,"dataflint") { +class DataFlintTab(parent: SparkUI) extends WebUITab(parent, "dataflint") { override val name: String = "DataFlint" - def render(request: HttpServletRequest): Seq[Node] = { - val content = -
-
- UIUtils.basicSparkPage(request, content, "DataFlint", true) - } -} +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintApplicationInfoPage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintApplicationInfoPage.scala deleted file mode 100644 index 376aaeff..00000000 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintApplicationInfoPage.scala +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.spark.dataflint.api - -import org.apache.spark.dataflint.listener.DataflintStore -import org.apache.spark.internal.Logging -import org.apache.spark.ui.{SparkUI, WebUIPage} -import org.json4s.{Extraction, JObject} - -import jakarta.servlet.http.HttpServletRequest -import scala.xml.Node - -class DataflintApplicationInfoPage(ui: SparkUI, dataflintStore: DataflintStore) - extends WebUIPage("applicationinfo") with Logging { - override def renderJson(request: HttpServletRequest) = { - try { - val runIdConfigFromStore = ui.store.environmentInfo().sparkProperties.find(_._1 == "spark.dataflint.runId").map(_._2) - val runIdPotentiallyFromConfig = if (runIdConfigFromStore.isEmpty) ui.conf.getOption("spark.dataflint.runId") else runIdConfigFromStore - val applicationInfo = ui.store.applicationInfo() - val environmentInfo = dataflintStore.environmentInfo() - val dataFlintApplicationInfo = DataFlintApplicationInfo(runIdPotentiallyFromConfig, applicationInfo, environmentInfo) - val jsonValue = Extraction.decompose(dataFlintApplicationInfo)(org.json4s.DefaultFormats) - jsonValue - } - catch { - case e: Throwable => { - logError("failed to serve dataflint application info", e) - JObject() - } - } - } - - override def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() -} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintCachedStoragePage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintCachedStoragePage.scala deleted file mode 100644 index 384b5bff..00000000 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintCachedStoragePage.scala +++ /dev/null @@ -1,57 +0,0 @@ -package org.apache.spark.dataflint.api - -import org.apache.spark.dataflint.listener.{DataflintExecutorStorageInfo, DataflintRDDStorageInfo, DataflintStore} -import org.apache.spark.internal.Logging -import org.apache.spark.status.AppStatusStore -import org.apache.spark.ui.{SparkUI, WebUIPage} -import org.json4s.{Extraction, JObject} - -import jakarta.servlet.http.HttpServletRequest -import scala.xml.Node - -class DataflintCachedStoragePage(ui: SparkUI, dataflintStore: DataflintStore) - extends WebUIPage("cachedstorage") with Logging { - override def renderJson(request: HttpServletRequest) = { - try { - val liveRddStorage = ui.store.rddList() - val rddStorage = dataflintStore.rddStorageInfo() - val graphs = ui.store.stageList(null) - .filter(_.submissionTime.isDefined) // filter skipped or pending stages - .map(stage => Tuple2(stage.stageId, - ui.store.operationGraphForStage(stage.stageId).rootCluster.childClusters.flatMap(_.childNodes) - .filter(_.cached) - .map(rdd => { - - val liveCached = liveRddStorage.find(_.id == rdd.id).map( - rdd => { - val maxUsageExecutor = rdd.dataDistribution.map(executors => executors.maxBy(_.memoryUsed)) - val maxExecutorUsage = maxUsageExecutor.map(executor => - DataflintExecutorStorageInfo( - executor.memoryUsed, - executor.memoryRemaining, - if(executor.memoryUsed + executor.memoryRemaining != 0) executor.memoryUsed.toDouble / (executor.memoryUsed + executor.memoryRemaining) * 100 else 0 - )) - DataflintRDDStorageInfo(rdd.id, - rdd.memoryUsed, - rdd.diskUsed, - rdd.numPartitions, - rdd.storageLevel, - maxExecutorUsage - )} - ) - val cached = rddStorage.find(_.rddId == rdd.id) - liveCached.getOrElse(cached) - }))).toMap - val jsonValue = Extraction.decompose(graphs)(org.json4s.DefaultFormats) - jsonValue - } - catch { - case e: Throwable => { - logError("failed to serve dataflint Jobs RDD", e) - JObject() - } - } - } - - override def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() -} diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintDeltaLakeScanPage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintDeltaLakeScanPage.scala deleted file mode 100644 index 345fce34..00000000 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintDeltaLakeScanPage.scala +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.spark.dataflint.api - -import org.apache.spark.dataflint.listener.DataflintStore -import org.apache.spark.internal.Logging -import org.apache.spark.ui.{SparkUI, WebUIPage} -import org.json4s.{Extraction, JObject} - -import jakarta.servlet.http.HttpServletRequest -import scala.xml.Node - -class DataflintDeltaLakeScanPage(ui: SparkUI, dataflintStore: DataflintStore) - extends WebUIPage("deltalake") with Logging { - override def renderJson(request: HttpServletRequest) = { - try { - val offset = request.getParameter("offset") - val length = request.getParameter("length") - if (offset == null || length == null) { - JObject() - } else { - val scans = dataflintStore.deltaLakeScanInfo(offset.toInt, length.toInt) - val deltaLakeInfo = DeltaLakeScanInfo(scans = scans) - val jsonValue = Extraction.decompose(deltaLakeInfo)(org.json4s.DefaultFormats) - jsonValue - } - } - catch { - case e: Throwable => { - logError("failed to serve dataflint delta lake scan info", e) - JObject() - } - } - } - - override def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() -} - diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintIcebergPage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintIcebergPage.scala deleted file mode 100644 index 61630adf..00000000 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintIcebergPage.scala +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.spark.dataflint.api - -import org.apache.spark.dataflint.listener.DataflintStore -import org.apache.spark.internal.Logging -import org.apache.spark.ui.{SparkUI, WebUIPage} -import org.json4s.{Extraction, JObject} - -import jakarta.servlet.http.HttpServletRequest -import scala.xml.Node - -class DataflintIcebergPage(ui: SparkUI, dataflintStore: DataflintStore) - extends WebUIPage("iceberg") with Logging { - override def renderJson(request: HttpServletRequest) = { - try { - val offset = request.getParameter("offset") - val length = request.getParameter("length") - if (offset == null || length == null) { - JObject() - } else { - val commits = dataflintStore.icebergCommits(offset.toInt, length.toInt) - val icebergInfo = IcebergInfo(commitsInfo = commits) - val jsonValue = Extraction.decompose(icebergInfo)(org.json4s.DefaultFormats) - jsonValue - } - } - catch { - case e: Throwable => { - logError("failed to serve dataflint iceberg", e) - JObject() - } - } - } - - override def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() -} diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintJettyUtils.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintJettyUtils.scala index 7f7994de..7ed22536 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintJettyUtils.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintJettyUtils.scala @@ -2,7 +2,6 @@ package org.apache.spark.dataflint.api import org.apache.spark.ui.SparkUI -import jakarta.servlet.Servlet import scala.language.implicitConversions object DataflintJettyUtils { @@ -16,7 +15,9 @@ object DataflintJettyUtils { // copy of createStaticHandler in core/src/main/scala/org/apache/spark/ui/JettyUtils.scala // only difference is we are loading the resources from this class loader which might be different from the spark one // with use reflection to support both org.sparkproject.jetty.servlet and org.eclipse.jetty - // in spark source code + // in spark source code. + // Avoids any compile-time reference to javax.servlet or jakarta.servlet so that the + // same artifact can run on Databricks Runtime 17.3 (javax) and on stock Spark 4 (jakarta). private def createStaticHandler(resourceBase: String, path: String): Any = { // Try to load classes from both packages def getClassForName(className: String): Class[_] = { @@ -35,9 +36,15 @@ object DataflintJettyUtils { val setInitParameterMethod = contextHandler.getClass.getMethod("setInitParameter", classOf[String], classOf[String]) setInitParameterMethod.invoke(contextHandler, "org.eclipse.jetty.servlet.Default.gzip", "false") - val staticHandler = defaultServletClass.getDeclaredConstructor().newInstance() - val servletHolderConstructor = servletHolderClass.getConstructor(classOf[Servlet]) - val holder = servletHolderConstructor.newInstance(staticHandler.asInstanceOf[Object]) + val staticHandler = defaultServletClass.getDeclaredConstructor().newInstance().asInstanceOf[Object] + // Pick the 1-arg ServletHolder constructor whose parameter type accepts our static + // handler, regardless of whether the runtime expects jakarta.servlet.Servlet or + // javax.servlet.Servlet. Replaces a previous getConstructor(classOf[jakarta.servlet.Servlet]) + // call that broke load on javax-only runtimes (e.g. DBR 17.3). + val servletHolderConstructor = servletHolderClass.getConstructors + .find(c => c.getParameterCount == 1 && c.getParameterTypes()(0).isInstance(staticHandler)) + .getOrElse(sys.error(s"No 1-arg ServletHolder constructor accepting ${staticHandler.getClass.getName}")) + val holder = servletHolderConstructor.newInstance(staticHandler) Option(this.getClass.getClassLoader.getResource(resourceBase)) match { case Some(res) => @@ -56,4 +63,4 @@ object DataflintJettyUtils { contextHandler } -} +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintReflectiveServletBuilder.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintReflectiveServletBuilder.scala new file mode 100644 index 00000000..11f320d6 --- /dev/null +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintReflectiveServletBuilder.scala @@ -0,0 +1,371 @@ +package org.apache.spark.dataflint.api + +import java.lang.reflect.{InvocationHandler, Method, Proxy} +import org.apache.spark.dataflint.listener.{DataflintExecutorStorageInfo, DataflintRDDStorageInfo, DataflintStore} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SparkPlanGraph} +import org.apache.spark.ui.SparkUI +import org.json4s.{Extraction, JArray, JObject, JValue} +import org.json4s.jackson.JsonMethods.compact + +/** + * Build Jetty ServletContextHandlers that expose JSON endpoints without any + * compile-time reference to javax.servlet or jakarta.servlet. + * + * Lets a single artifact run on both stock Spark 4 (jakarta) and Databricks + * Runtime 17.3 (javax). The Servlet instance is a java.lang.reflect.Proxy + * over the Servlet interface that exists on the classpath at runtime; the + * Jetty ServletContextHandler/ServletHolder are loaded by reflection from + * either org.sparkproject.jetty.servlet.* or org.eclipse.jetty.servlet.*. + */ +object DataflintReflectiveServletBuilder extends Logging { + + private lazy val servletApiPackage: String = { + try { Class.forName("jakarta.servlet.Servlet"); "jakarta.servlet" } + catch { + case _: ClassNotFoundException => + Class.forName("javax.servlet.Servlet"); "javax.servlet" + } + } + + private def loadJettyClass(simpleName: String): Class[_] = { + try Class.forName(s"org.sparkproject.jetty.servlet.$simpleName") + catch { + case _: ClassNotFoundException => + Class.forName(s"org.eclipse.jetty.servlet.$simpleName") + } + } + + /** + * Build a ServletContextHandler that serves a single JSON payload at "/". + * + * @param contextPath e.g. "/dataflint/applicationinfo/json" + * @param jsonProducer takes a parameter accessor and returns a JValue + * @return a Jetty ServletContextHandler instance (typed as Any to stay neutral) + */ + def buildJsonHandler(contextPath: String, + jsonProducer: (String => String) => JValue): Any = { + val servletInterface = Class.forName(s"$servletApiPackage.Servlet") + val httpServletRequestClass = Class.forName(s"$servletApiPackage.http.HttpServletRequest") + + val invocationHandler = new InvocationHandler { + override def invoke(proxy: Any, method: Method, args: Array[AnyRef]): AnyRef = { + method.getName match { + case "service" => + try { + val req = args(0) + val resp = args(1) + val getParameterMethod = httpServletRequestClass.getMethod("getParameter", classOf[String]) + val getParameter: String => String = (name: String) => + getParameterMethod.invoke(req, name).asInstanceOf[String] + val body = compact(jsonProducer(getParameter)) + + resp.getClass.getMethod("setContentType", classOf[String]).invoke(resp, "application/json") + resp.getClass.getMethod("setStatus", classOf[Int]).invoke(resp, Int.box(200)) + val writer = resp.getClass.getMethod("getWriter").invoke(resp) + writer.getClass.getMethod("print", classOf[String]).invoke(writer, body) + } catch { + case e: Throwable => logError(s"Failed to serve $contextPath", e) + } + null + case _ => null + } + } + } + + val servlet = Proxy.newProxyInstance( + getClass.getClassLoader, + Array(servletInterface), + invocationHandler + ) + + val servletContextHandlerClass = loadJettyClass("ServletContextHandler") + val servletHolderClass = loadJettyClass("ServletHolder") + + val ctx = servletContextHandlerClass.getDeclaredConstructor().newInstance().asInstanceOf[Object] + ctx.getClass.getMethod("setContextPath", classOf[String]).invoke(ctx, contextPath) + + val holderCtor = servletHolderClass.getConstructors + .find(c => c.getParameterCount == 1 && c.getParameterTypes()(0).isInstance(servlet)) + .getOrElse(sys.error(s"No 1-arg ServletHolder constructor accepting ${servlet.getClass.getName}")) + val holder = holderCtor.newInstance(servlet.asInstanceOf[Object]).asInstanceOf[Object] + + ctx.getClass.getMethod("addServlet", servletHolderClass, classOf[String]) + .invoke(ctx, holder, "/") + + ctx + } + + /** + * Reflectively call SparkUI.attachHandler(ServletContextHandler). + * Required because the parameter type is jakarta- or javax-bound at compile time + * depending on which Spark we were built against. + */ + def attachToUI(ui: SparkUI, handler: Any): Unit = { + val attachHandlerMethod = ui.getClass.getMethods + .find(m => m.getName == "attachHandler" && m.getParameterCount == 1) + .getOrElse(sys.error("SparkUI has no attachHandler(handler) method")) + attachHandlerMethod.invoke(ui, handler.asInstanceOf[Object]) + } + + private def attachEndpoint(ui: SparkUI, + basePath: String, + name: String, + producer: (String => String) => JValue): Unit = { + val contextPath = basePath + s"/dataflint/$name/json" + val handler = buildJsonHandler(contextPath, producer) + attachToUI(ui, handler) + logInfo(s"DataFlint reflective endpoint attached at $contextPath") + } + + /** + * Wire up every DataFlint JSON endpoint reflectively. Replaces the + * WebUIPage-based flow used on Spark 3, so that the same artifact can + * serve the JSON API on stock Spark 4 (jakarta.servlet) and on + * Databricks Runtime 17.3 (javax.servlet). + */ + def attachAllEndpoints(ui: SparkUI, + dataflintStore: DataflintStore, + sqlListener: () => Option[SQLAppStatusListener], + basePath: String): Unit = { + attachApplicationInfoEndpoint(ui, dataflintStore, basePath) + attachSqlPlanEndpoint(ui, dataflintStore, sqlListener, basePath) + attachSqlMetricsEndpoint(ui, sqlListener, basePath) + attachStagesRddEndpoint(ui, basePath) + attachIcebergEndpoint(ui, dataflintStore, basePath) + attachDeltaLakeScanEndpoint(ui, dataflintStore, basePath) + attachCachedStorageEndpoint(ui, basePath, dataflintStore) + } + + private def attachApplicationInfoEndpoint(ui: SparkUI, + dataflintStore: DataflintStore, + basePath: String): Unit = { + val producer: (String => String) => JValue = _ => { + try { + val runIdConfigFromStore = ui.store.environmentInfo().sparkProperties + .find(_._1 == "spark.dataflint.runId").map(_._2) + val runIdPotentiallyFromConfig = + if (runIdConfigFromStore.isEmpty) ui.conf.getOption("spark.dataflint.runId") + else runIdConfigFromStore + val applicationInfo = ui.store.applicationInfo() + val environmentInfo = dataflintStore.environmentInfo() + val dataFlintApplicationInfo = + DataFlintApplicationInfo(runIdPotentiallyFromConfig, applicationInfo, environmentInfo) + Extraction.decompose(dataFlintApplicationInfo)(org.json4s.DefaultFormats) + } catch { + case e: Throwable => + logError("failed to serve dataflint application info", e) + JObject() + } + } + attachEndpoint(ui, basePath, "applicationinfo", producer) + } + + private def attachSqlPlanEndpoint(ui: SparkUI, + dataflintStore: DataflintStore, + sqlListener: () => Option[SQLAppStatusListener], + basePath: String): Unit = { + var sqlListenerCache: Option[SQLAppStatusListener] = None + val producer: (String => String) => JValue = getParameter => { + try { + if (sqlListenerCache.isEmpty) { + sqlListenerCache = sqlListener() + } + val sqlStore = new SQLAppStatusStore(ui.store.store, sqlListenerCache) + + val offset = getParameter("offset") + val length = getParameter("length") + if (offset == null || length == null) { + JArray(List()) + } else { + val executionList = sqlStore.executionsList(offset.toInt, length.toInt) + + val isDatabricks = ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined + + val latestVersionReader = if (isDatabricks && executionList.nonEmpty) Some(executionList.head.getClass.getMethod("latestVersion")) else None + val planGraphReader = if (isDatabricks) Some(sqlStore.getClass.getMethods.filter(_.getName == "planGraph").head) else None + val rddScopesToStagesReader = if (isDatabricks && executionList.nonEmpty) Some(executionList.head.getClass.getMethod("rddScopesToStages")) else None + + val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt) + + val sqlPlans = executionList.flatMap { exec => + try { + val graph = if (isDatabricks) { + val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] + planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] + } else + sqlStore.planGraph(exec.executionId) + + val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None + + val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) + Some(SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages, + graph.allNodes.map(node => { + val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) + NodePlan(node.id, node.desc, rddScopeId) + }).toSeq + )) + } catch { + case _: Throwable => None + } + } + Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats) + } + } catch { + case e: Throwable => + logError("failed to serve dataflint SQL metrics", e) + JArray(List()) + } + } + attachEndpoint(ui, basePath, "sqlplan", producer) + } + + private def attachSqlMetricsEndpoint(ui: SparkUI, + sqlListener: () => Option[SQLAppStatusListener], + basePath: String): Unit = { + var sqlListenerCache: Option[SQLAppStatusListener] = None + val producer: (String => String) => JValue = getParameter => { + try { + if (sqlListenerCache.isEmpty) { + sqlListenerCache = sqlListener() + } + + val sqlStore = new SQLAppStatusStore(ui.store.store, sqlListenerCache) + val executionId = getParameter("executionId") + if (executionId == null) { + JObject() + } else { + val executionIdLong = executionId.toLong + val metrics = sqlStore.executionMetrics(executionIdLong) + val isDatabricks = ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined + val graph = if (isDatabricks) { + val exec = sqlStore.execution(executionIdLong).get + val planVersion = exec.getClass.getMethod("latestVersion").invoke(exec).asInstanceOf[Long] + sqlStore.getClass.getMethods.filter(_.getName == "planGraph").head.invoke(sqlStore, executionIdLong.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] + } else + sqlStore.planGraph(executionIdLong) + val nodesMetrics = graph.allNodes.map(node => NodeMetrics(node.id, node.name, node.metrics.map(metric => { + NodeMetric(metric.name, metrics.get(metric.accumulatorId)) + }).toSeq)) + // filter nodes without metrics + .filter(nodeMetrics => !nodeMetrics.metrics.forall(_.value.isEmpty)) + Extraction.decompose(nodesMetrics)(org.json4s.DefaultFormats) + } + } catch { + case e: Throwable => + logError("failed to serve dataflint SQL metrics", e) + JObject() + } + } + attachEndpoint(ui, basePath, "sqlmetrics", producer) + } + + private def attachStagesRddEndpoint(ui: SparkUI, basePath: String): Unit = { + val producer: (String => String) => JValue = _ => { + try { + val graphs = ui.store.stageList(null) + .filter(_.submissionTime.isDefined) // filter skipped or pending stages + .map(stage => Tuple2(stage.stageId, + ui.store.operationGraphForStage(stage.stageId).rootCluster.childClusters + .map(rdd => Tuple2(rdd.id, rdd.name)).toMap)) + .toMap + Extraction.decompose(graphs)(org.json4s.DefaultFormats) + } catch { + case e: Throwable => + logError("failed to serve dataflint Jobs RDD", e) + JObject() + } + } + attachEndpoint(ui, basePath, "stagesrdd", producer) + } + + private def attachIcebergEndpoint(ui: SparkUI, + dataflintStore: DataflintStore, + basePath: String): Unit = { + val producer: (String => String) => JValue = getParameter => { + try { + val offset = getParameter("offset") + val length = getParameter("length") + if (offset == null || length == null) { + JObject() + } else { + val commits = dataflintStore.icebergCommits(offset.toInt, length.toInt) + val icebergInfo = IcebergInfo(commitsInfo = commits) + Extraction.decompose(icebergInfo)(org.json4s.DefaultFormats) + } + } catch { + case e: Throwable => + logError("failed to serve dataflint iceberg", e) + JObject() + } + } + attachEndpoint(ui, basePath, "iceberg", producer) + } + + private def attachDeltaLakeScanEndpoint(ui: SparkUI, + dataflintStore: DataflintStore, + basePath: String): Unit = { + val producer: (String => String) => JValue = getParameter => { + try { + val offset = getParameter("offset") + val length = getParameter("length") + if (offset == null || length == null) { + JObject() + } else { + val scans = dataflintStore.deltaLakeScanInfo(offset.toInt, length.toInt) + val deltaLakeInfo = DeltaLakeScanInfo(scans = scans) + Extraction.decompose(deltaLakeInfo)(org.json4s.DefaultFormats) + } + } catch { + case e: Throwable => + logError("failed to serve dataflint delta lake scan info", e) + JObject() + } + } + attachEndpoint(ui, basePath, "deltaLakeScans", producer) + } + + private def attachCachedStorageEndpoint(ui: SparkUI, + basePath: String, + dataflintStore: DataflintStore): Unit = { + val producer: (String => String) => JValue = _ => { + try { + val liveRddStorage = ui.store.rddList() + val rddStorage = dataflintStore.rddStorageInfo() + val graphs = ui.store.stageList(null) + .filter(_.submissionTime.isDefined) // filter skipped or pending stages + .map(stage => Tuple2(stage.stageId, + ui.store.operationGraphForStage(stage.stageId).rootCluster.childClusters.flatMap(_.childNodes) + .filter(_.cached) + .map(rdd => { + + val liveCached = liveRddStorage.find(_.id == rdd.id).map( + rdd => { + val maxUsageExecutor = rdd.dataDistribution.map(executors => executors.maxBy(_.memoryUsed)) + val maxExecutorUsage = maxUsageExecutor.map(executor => + DataflintExecutorStorageInfo( + executor.memoryUsed, + executor.memoryRemaining, + if(executor.memoryUsed + executor.memoryRemaining != 0) executor.memoryUsed.toDouble / (executor.memoryUsed + executor.memoryRemaining) * 100 else 0 + )) + DataflintRDDStorageInfo(rdd.id, + rdd.memoryUsed, + rdd.diskUsed, + rdd.numPartitions, + rdd.storageLevel, + maxExecutorUsage + )} + ) + val cached = rddStorage.find(_.rddId == rdd.id) + liveCached.getOrElse(cached) + }))).toMap + Extraction.decompose(graphs)(org.json4s.DefaultFormats) + } catch { + case e: Throwable => + logError("failed to serve dataflint Jobs RDD", e) + JObject() + } + } + attachEndpoint(ui, basePath, "cachedstorage", producer) + } +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLMetricsPage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLMetricsPage.scala deleted file mode 100644 index 1dbf80db..00000000 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLMetricsPage.scala +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.spark.dataflint.api - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SparkPlanGraph} -import org.apache.spark.ui.{SparkUI, WebUIPage} -import org.json4s.{Extraction, JObject} - -import jakarta.servlet.http.HttpServletRequest -import scala.xml.Node - -class DataflintSQLMetricsPage(ui: SparkUI, sqlListener: () => Option[SQLAppStatusListener]) - extends WebUIPage("sqlmetrics") with Logging { - private var sqlListenerCache: Option[SQLAppStatusListener] = None - - override def renderJson(request: HttpServletRequest) = { - try { - if (sqlListenerCache.isEmpty) { - sqlListenerCache = sqlListener() - } - - val sqlStore = new SQLAppStatusStore(ui.store.store, sqlListenerCache) - val executionId = request.getParameter("executionId") - if (executionId == null) { - JObject() - } else { - val executionIdLong = executionId.toLong - val metrics = sqlStore.executionMetrics(executionIdLong) - val isDatabricks = ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined - val graph = if (isDatabricks) { - val exec = sqlStore.execution(executionIdLong).get - val planVersion = exec.getClass.getMethod("latestVersion").invoke(exec).asInstanceOf[Long] - sqlStore.getClass.getMethods.filter(_.getName == "planGraph").head.invoke(sqlStore, executionIdLong.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] - } else - sqlStore.planGraph(executionIdLong) - val nodesMetrics = graph.allNodes.map(node => NodeMetrics(node.id, node.name, node.metrics.map(metric => { - NodeMetric(metric.name, metrics.get(metric.accumulatorId)) - }).toSeq)) - // filter nodes without metrics - .filter(nodeMetrics => !nodeMetrics.metrics.forall(_.value.isEmpty)) - val jValue = Extraction.decompose(nodesMetrics)(org.json4s.DefaultFormats) - jValue - } - } catch { - case e: Throwable => { - logError("failed to serve dataflint SQL metrics", e) - JObject() - } - } - } - - override def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() -} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala deleted file mode 100644 index 3dc63ad0..00000000 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala +++ /dev/null @@ -1,71 +0,0 @@ -package org.apache.spark.dataflint.api - -import org.apache.spark.dataflint.listener.DataflintStore -import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SparkPlanGraph} -import org.apache.spark.ui.{SparkUI, WebUIPage} -import org.json4s.{Extraction, JArray, JObject} -import jakarta.servlet.http.HttpServletRequest -import scala.xml.Node - -class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListener: () => Option[SQLAppStatusListener]) - extends WebUIPage("sqlplan") with Logging { - private var sqlListenerCache: Option[SQLAppStatusListener] = None - - override def renderJson(request: HttpServletRequest) = { - try { - if (sqlListenerCache.isEmpty) { - sqlListenerCache = sqlListener() - } - val sqlStore = new SQLAppStatusStore(ui.store.store, sqlListenerCache) - - val offset = request.getParameter("offset") - val length = request.getParameter("length") - if (offset == null || length == null) { - JArray(List()) - } else { - val executionList = sqlStore.executionsList(offset.toInt, length.toInt) - - val isDatabricks = ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined - - val latestVersionReader = if (isDatabricks && executionList.nonEmpty) Some(executionList.head.getClass.getMethod("latestVersion")) else None - val planGraphReader = if (isDatabricks) Some(sqlStore.getClass.getMethods.filter(_.getName == "planGraph").head) else None - val rddScopesToStagesReader = if (isDatabricks && executionList.nonEmpty) Some(executionList.head.getClass.getMethod("rddScopesToStages")) else None - - val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt) - - val sqlPlans = executionList.flatMap { exec => - try { - val graph = if (isDatabricks) { - val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] - planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] - } else - sqlStore.planGraph(exec.executionId) - - val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None - - val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) - Some(SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages, - graph.allNodes.map(node => { - val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) - NodePlan(node.id, node.desc, rddScopeId) - }).toSeq - )) - } catch { - case _: Throwable => None - } - } - val jsonValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats) - jsonValue - } - } - catch { - case e: Throwable => { - logError("failed to serve dataflint SQL metrics", e) - JArray(List()) - } - } - } - - override def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() -} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLStagesRddPage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLStagesRddPage.scala deleted file mode 100644 index 9eca5603..00000000 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLStagesRddPage.scala +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.spark.dataflint.api - -import org.apache.spark.internal.Logging -import org.apache.spark.ui.{SparkUI, WebUIPage} -import org.json4s.{Extraction, JObject} - -import jakarta.servlet.http.HttpServletRequest -import scala.xml.Node - -class DataflintSQLStagesRddPage(ui: SparkUI) - extends WebUIPage("stagesrdd") with Logging { - override def renderJson(request: HttpServletRequest) = { - try { - val graphs = ui.store.stageList(null) - .filter(_.submissionTime.isDefined) // filter skipped or pending stages - .map(stage => Tuple2(stage.stageId, - ui.store.operationGraphForStage(stage.stageId).rootCluster.childClusters - .map(rdd => Tuple2(rdd.id, rdd.name)).toMap)) - .toMap - val jsonValue = Extraction.decompose(graphs)(org.json4s.DefaultFormats) - jsonValue - } - catch { - case e: Throwable => { - logError("failed to serve dataflint Jobs RDD", e) - JObject() - } - } - } - - override def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() -} diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/Spark4PageFactory.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/Spark4PageFactory.scala index e14a7b84..27f6d850 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/Spark4PageFactory.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/Spark4PageFactory.scala @@ -5,42 +5,47 @@ import org.apache.spark.sql.execution.ui.SQLAppStatusListener import org.apache.spark.ui.{SparkUI, WebUIPage, WebUITab} /** - * Spark 4.x implementation of DataflintPageFactory using jakarta.servlet API + * Spark 4.x implementation of DataflintPageFactory. + * + * Unlike the Spark 3 factory, this implementation does NOT serve JSON via + * `WebUIPage` subclasses — those would tie the bytecode to either + * jakarta.servlet or javax.servlet at compile time, but the same artifact + * needs to load on both stock Spark 4 (jakarta) and Databricks Runtime 17.3 + * (javax). Instead, the factory advertises [[usesReflectiveEndpoints]] and + * wires every endpoint through a reflective Jetty handler in + * [[DataflintReflectiveServletBuilder]]. The `createXxxPage` methods are + * therefore never called on the Spark 4 path and throw if invoked. */ class Spark4PageFactory extends DataflintPageFactory { - + override def createDataFlintTab(ui: SparkUI): WebUITab = { new DataFlintTab(ui) } - - override def createApplicationInfoPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage = { - new DataflintApplicationInfoPage(ui, dataflintStore) - } - - override def createCachedStoragePage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage = { - new DataflintCachedStoragePage(ui, dataflintStore) - } - - override def createIcebergPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage = { - new DataflintIcebergPage(ui, dataflintStore) - } - - override def createDeltaLakeScanPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage = { - new DataflintDeltaLakeScanPage(ui, dataflintStore) - } - - override def createSQLMetricsPage(ui: SparkUI, sqlListener: () => Option[SQLAppStatusListener]): WebUIPage = { - new DataflintSQLMetricsPage(ui, sqlListener) - } - - override def createSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListener: () => Option[SQLAppStatusListener]): WebUIPage = { - new DataflintSQLPlanPage(ui, dataflintStore, sqlListener) - } - - override def createSQLStagesRddPage(ui: SparkUI): WebUIPage = { - new DataflintSQLStagesRddPage(ui) - } - + + // The Spark 4 loader path skips WebUIPage instantiation entirely (see + // usesReflectiveEndpoints below). These methods exist only because the trait + // is shared with Spark 3; calling them would mean the loader took the wrong + // branch. + private def webUIPageNotUsed: Nothing = + throw new UnsupportedOperationException( + "WebUIPage is not used on Spark 4 — endpoints are attached reflectively via " + + "DataflintReflectiveServletBuilder.attachAllEndpoints" + ) + + override def createApplicationInfoPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage = webUIPageNotUsed + + override def createCachedStoragePage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage = webUIPageNotUsed + + override def createIcebergPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage = webUIPageNotUsed + + override def createDeltaLakeScanPage(ui: SparkUI, dataflintStore: DataflintStore): WebUIPage = webUIPageNotUsed + + override def createSQLMetricsPage(ui: SparkUI, sqlListener: () => Option[SQLAppStatusListener]): WebUIPage = webUIPageNotUsed + + override def createSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListener: () => Option[SQLAppStatusListener]): WebUIPage = webUIPageNotUsed + + override def createSQLStagesRddPage(ui: SparkUI): WebUIPage = webUIPageNotUsed + override def addStaticHandler(ui: SparkUI, resourceBase: String, contextPath: String): Unit = { DataflintJettyUtils.addStaticHandler(ui, resourceBase, contextPath) } @@ -49,10 +54,11 @@ class Spark4PageFactory extends DataflintPageFactory { ui.getTabs.toSeq } - // Databricks Runtime 17.3 (Spark 4 based) ships javax.servlet instead of jakarta.servlet, - // so any access to jakarta.servlet.* in this module crashes with NoClassDefFoundError. - // Skip the entire DataFlint UI on Databricks; listeners (data export) still run. - override def isUISupported(ui: SparkUI): Boolean = { - !ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined + override def usesReflectiveEndpoints: Boolean = true + + override def attachReflectiveEndpoints(ui: SparkUI, + dataflintStore: DataflintStore, + sqlListener: () => Option[SQLAppStatusListener]): Unit = { + DataflintReflectiveServletBuilder.attachAllEndpoints(ui, dataflintStore, sqlListener, ui.basePath) } -} +} \ No newline at end of file