diff --git a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/SchedulingSmallTasksSkipAlerts.scala b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/SchedulingSmallTasksSkipAlerts.scala new file mode 100644 index 00000000..355cf9c0 --- /dev/null +++ b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/SchedulingSmallTasksSkipAlerts.scala @@ -0,0 +1,24 @@ +package io.dataflint.example + +import org.apache.spark.sql.SparkSession + +object SchedulingSmallTasksSkipAlerts extends App { + val spark = SparkSession + .builder() + .appName("SchedulingSmallTasks") + .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") + .config("spark.dataflint.telemetry.enabled", false) + .config("spark.ui.port", "10000") + .config("spark.dataflint.telemetry.enabled", value = false) + .config("spark.sql.maxMetadataStringLength", "10000") + .config("spark.dataflint.alert.disabled", "smallTasks,idleCoresTooHigh") + .master("local[*]") + .getOrCreate() + + val numbers = spark.range(0, 10000).repartition(10000).count() + + println(s"count numbers to 10000: $numbers") + + scala.io.StdIn.readLine() + spark.stop() +} diff --git a/spark-ui/src/interfaces/AppStore.ts b/spark-ui/src/interfaces/AppStore.ts index ac53d258..6f38bebe 100644 --- a/spark-ui/src/interfaces/AppStore.ts +++ b/spark-ui/src/interfaces/AppStore.ts @@ -87,6 +87,7 @@ export interface ConfigEntry { export type ConfigEntries = ConfigEntry[]; export interface ConfigStore { + alertDisabled: string | undefined; resourceControlType: ResourceMode; configs: ConfigEntries; executorMemoryBytes: number; diff --git a/spark-ui/src/reducers/AlertsReducer.ts b/spark-ui/src/reducers/AlertsReducer.ts index 27dc4087..a4b92a4b 100644 --- a/spark-ui/src/reducers/AlertsReducer.ts +++ b/spark-ui/src/reducers/AlertsReducer.ts @@ -18,6 +18,7 @@ import { reduceSQLInputOutputAlerts } from "./Alerts/MemorySQLInputOutputAlerts" import { reducePartitionSkewAlert } from "./Alerts/PartitionSkewAlert"; import { reduceSmallTasksAlert } from "./Alerts/SmallTasksAlert"; import { reduceWastedCoresAlerts } from "./Alerts/WastedCoresAlertsReducer"; +import { parseAlertDisabledConfig } from "../utils/ConfigParser"; export function reduceAlerts( sqlStore: SparkSQLStore, @@ -39,8 +40,9 @@ export function reduceAlerts( reduceJoinToBroadcastAlert(sqlStore, alerts); reduceLargeCrossJoinScanAlert(sqlStore, alerts); reduceMaxPartitionToBigAlert(sqlStore, stageStore, alerts); - + const disabledAlerts = parseAlertDisabledConfig(config.alertDisabled); + const filteredAlerts = alerts.filter(alert => !disabledAlerts.has(alert.name)); return { - alerts: alerts, + alerts: filteredAlerts, }; } diff --git a/spark-ui/src/reducers/ConfigReducer.ts b/spark-ui/src/reducers/ConfigReducer.ts index e779b2d9..b9b3efe3 100644 --- a/spark-ui/src/reducers/ConfigReducer.ts +++ b/spark-ui/src/reducers/ConfigReducer.ts @@ -134,6 +134,7 @@ export function extractConfig( const resourceControlType = findResourceControlType(sparkPropertiesObj); const appName = sparkPropertiesObj["spark.app.name"]; + const alertDisabled = sparkPropertiesObj["spark.dataflint.alert.disabled"] || undefined; const config: ConfigEntries = [ { name: "app name", @@ -381,6 +382,7 @@ export function extractConfig( return [ appName, { + alertDisabled: alertDisabled, resourceControlType: resourceControlType, configs: config, executorMemoryOverheadViaConfigString: memoryOverheadViaConfigString, diff --git a/spark-ui/src/utils/ConfigParser.ts b/spark-ui/src/utils/ConfigParser.ts new file mode 100644 index 00000000..000f9e6b --- /dev/null +++ b/spark-ui/src/utils/ConfigParser.ts @@ -0,0 +1,5 @@ +// Utility to parse the spark.dataflint.alert.disabled config +export function parseAlertDisabledConfig(config: string | undefined): Set { + if (!config) return new Set(); + return new Set(config.split(',').map(x => x.trim()).filter(Boolean)); +}