Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,26 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe

val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt)

val sqlPlans = executionList.map { exec =>
val graph = if (isDatabricks) {
val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long]
planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph]
} else
sqlStore.planGraph(exec.executionId)
val sqlPlans = executionList.flatMap { exec =>
try {
val graph = if (isDatabricks) {
val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long]
planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph]
} else
sqlStore.planGraph(exec.executionId)

val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None
val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None

val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId)
SqlEnrichedData(exec.executionId, getRootExecutionId(exec, exec.executionId), graph.allNodes.length, rddScopesToStages,
graph.allNodes.map(node => {
val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id))
NodePlan(node.id, node.desc, rddScopeId)
}).toSeq
)
val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId)
Some(SqlEnrichedData(exec.executionId, getRootExecutionId(exec, exec.executionId), graph.allNodes.length, rddScopesToStages,
graph.allNodes.map(node => {
val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id))
NodePlan(node.id, node.desc, rddScopeId)
}).toSeq
))
} catch {
case _: Throwable => None
}
}
val jsonValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats)
jsonValue
Expand All @@ -83,7 +87,7 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe
catch {
case e: Throwable => {
logError("failed to serve dataflint SQL metrics", e)
JObject()
JArray(List())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,26 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe

val nodeIdToRddScopeIdList = dataflintStore.databricksAdditionalExecutionInfo(offset.toInt, length.toInt)

val sqlPlans = executionList.map { exec =>
val graph = if (isDatabricks) {
val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long]
planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph]
} else
sqlStore.planGraph(exec.executionId)
val sqlPlans = executionList.flatMap { exec =>
try {
val graph = if (isDatabricks) {
val planVersion = latestVersionReader.get.invoke(exec).asInstanceOf[Long]
planGraphReader.get.invoke(sqlStore, exec.executionId.asInstanceOf[Object], planVersion.asInstanceOf[Object]).asInstanceOf[SparkPlanGraph]
} else
sqlStore.planGraph(exec.executionId)

val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None
val rddScopesToStages = if (isDatabricks) Some(rddScopesToStagesReader.get.invoke(exec).asInstanceOf[Map[String, Set[Object]]]) else None

val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId)
SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages,
graph.allNodes.map(node => {
val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id))
NodePlan(node.id, node.desc, rddScopeId)
}).toSeq
)
val nodeIdToRddScopeId = nodeIdToRddScopeIdList.find(_.executionId == exec.executionId).map(_.nodeIdToRddScopeId)
Some(SqlEnrichedData(exec.executionId, graph.allNodes.length, rddScopesToStages,
graph.allNodes.map(node => {
val rddScopeId = nodeIdToRddScopeId.flatMap(_.get(node.id))
NodePlan(node.id, node.desc, rddScopeId)
}).toSeq
))
} catch {
case _: Throwable => None
}
}
val jsonValue = Extraction.decompose(sqlPlans)(org.json4s.DefaultFormats)
jsonValue
Expand All @@ -58,7 +62,7 @@ class DataflintSQLPlanPage(ui: SparkUI, dataflintStore: DataflintStore, sqlListe
catch {
case e: Throwable => {
logError("failed to serve dataflint SQL metrics", e)
JObject()
JArray(List())
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion spark-ui/src/services/SparkApi.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class SparkAPI {
historyServerMode: boolean = false;
icebergEnabled: boolean = false;
sparkVersion: string | undefined = undefined;
// When true, use non-paginated /sql endpoint (for older Spark apps where paginated API returns 404)
useLegacySqlApi: boolean = false;

// Cache for response length+hash to skip processing when data hasn't changed
// Length is checked first (O(1)), hash only calculated if lengths match
Expand Down Expand Up @@ -110,6 +112,9 @@ class SparkAPI {
}

private buildSqlPath(offset: number): string {
if (this.useLegacySqlApi) {
return `${this.applicationPath}/sql`;
}
return `${this.applicationPath}/sql?offset=${offset}&length=${SQL_QUERY_LENGTH}&planDescription=false`;
}

Expand Down Expand Up @@ -312,6 +317,12 @@ class SparkAPI {
? currentAttempt.attemptId
: undefined;
this.sparkVersion = currentAttempt?.appSparkVersion;
// Paginated SQL API (/sql?offset=&length=) was added in Spark 3.2+
// Older apps on the history server need the non-paginated /sql endpoint
if (this.sparkVersion) {
const [major, minor] = this.sparkVersion.split(".").map(Number);
this.useLegacySqlApi = major < 3 || (major === 3 && minor < 2);
}
const sparkConfiguration: SparkConfiguration = await this.queryData(
this.environmentPath,
);
Expand Down Expand Up @@ -396,9 +407,10 @@ class SparkAPI {
this.buildSqlPath(sqlIdToQueryFrom),
true
);
const sparkPlans: SQLPlans = await this.queryData(
const sparkPlansResponse = await this.queryData(
this.buildSqlPlanPath(sqlIdToQueryFrom)
);
const sparkPlans: SQLPlans = Array.isArray(sparkPlansResponse) ? sparkPlansResponse : [];
let icebergInfo: IcebergInfo = { commitsInfo: [] };
if (this.icebergEnabled) {
icebergInfo = await this.queryData(
Expand Down
Loading