diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala index 6c6dcd16..b9927c48 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala @@ -59,22 +59,26 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt) - val sqlPlans = executionList.map { exec => - val graph = if (isDatabricks) { - val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] - planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] - } else - sqlStore.planGraph(exec.executionId) + val sqlPlans = executionList.flatMap { exec => + try { + val graph = if (isDatabricks) { + val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] + planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] + } else + sqlStore.planGraph(exec.executionId) - val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None + val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None - val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) - SqlEnrichedData(exec.executionId, getRootExecutionId(exec, exec.executionId), graph.allNodes.length, rddScopesToStages, - graph.allNodes.map(node => { - val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) - NodePlan(node.id, node.desc, rddScopeId) - }).toSeq - ) + val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) + Some(SqlEnrichedData(exec.executionId, getRootExecutionId(exec, exec.executionId), graph.allNodes.length, rddScopesToStages, + graph.allNodes.map(node => { + val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) + NodePlan(node.id, node.desc, rddScopeId) + }).toSeq + )) + } catch { + case _: Throwable => None + } } val jsonValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats) jsonValue @@ -83,7 +87,7 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe catch { case e: Throwable => { logError("failed to serve dataflint SQL metrics", e) - JObject() + JArray(List()) } } } diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala index 3d0ad940..3dc63ad0 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/api/DataflintSQLPlanPage.scala @@ -34,22 +34,26 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt) - val sqlPlans = executionList.map { exec => - val graph = if (isDatabricks) { - val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] - planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] - } else - sqlStore.planGraph(exec.executionId) + val sqlPlans = executionList.flatMap { exec => + try { + val graph = if (isDatabricks) { + val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long] + planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph] + } else + sqlStore.planGraph(exec.executionId) - val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None + val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None - val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) - SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages, - graph.allNodes.map(node => { - val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) - NodePlan(node.id, node.desc, rddScopeId) - }).toSeq - ) + val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId) + Some(SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages, + graph.allNodes.map(node => { + val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id)) + NodePlan(node.id, node.desc, rddScopeId) + }).toSeq + )) + } catch { + case _: Throwable => None + } } val jsonValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats) jsonValue @@ -58,7 +62,7 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe catch { case e: Throwable => { logError("failed to serve dataflint SQL metrics", e) - JObject() + JArray(List()) } } } diff --git a/spark-ui/src/services/SparkApi.tsx b/spark-ui/src/services/SparkApi.tsx index 66f20fe5..6942451f 100644 --- a/spark-ui/src/services/SparkApi.tsx +++ b/spark-ui/src/services/SparkApi.tsx @@ -66,6 +66,8 @@ class SparkAPI { historyServerMode: boolean = false; icebergEnabled: boolean = false; sparkVersion: string | undefined = undefined; + // When true, use non-paginated /sql endpoint (for older Spark apps where paginated API returns 404) + useLegacySqlApi: boolean = false; // Cache for response length+hash to skip processing when data hasn't changed // Length is checked first (O(1)), hash only calculated if lengths match @@ -110,6 +112,9 @@ class SparkAPI { } private buildSqlPath(offset: number): string { + if (this.useLegacySqlApi) { + return `${this.applicationPath}/sql`; + } return `${this.applicationPath}/sql?offset=${offset}&length=${SQL_QUERY_LENGTH}&planDescription=false`; } @@ -312,6 +317,12 @@ class SparkAPI { ? currentAttempt.attemptId : undefined; this.sparkVersion = currentAttempt?.appSparkVersion; + // Paginated SQL API (/sql?offset=&length=) was added in Spark 3.2+ + // Older apps on the history server need the non-paginated /sql endpoint + if (this.sparkVersion) { + const [major, minor] = this.sparkVersion.split(".").map(Number); + this.useLegacySqlApi = major < 3 || (major === 3 && minor < 2); + } const sparkConfiguration: SparkConfiguration = await this.queryData( this.environmentPath, ); @@ -396,9 +407,10 @@ class SparkAPI { this.buildSqlPath(sqlIdToQueryFrom), true ); - const sparkPlans: SQLPlans = await this.queryData( + const sparkPlansResponse = await this.queryData( this.buildSqlPlanPath(sqlIdToQueryFrom) ); + const sparkPlans: SQLPlans = Array.isArray(sparkPlansResponse) ? sparkPlansResponse : []; let icebergInfo: IcebergInfo = { commitsInfo: [] }; if (this.icebergEnabled) { icebergInfo = await this.queryData(