From 1d173c25ee6141d3799cfa86daa86cbe4e531f24 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Sun, 19 Apr 2026 14:09:42 +0300 Subject: [PATCH 1/3] Add executor metadata collection plugin for cloud instance detection Implement an ExecutorPlugin that collects machine metadata from each executor and reports it to the driver. When enabled via spark.dataflint.executor.metadata.enabled, each executor detects its cloud provider by reading /sys/class/dmi/id/sys_vendor, then fetches instance type and spot/on-demand status from the cloud metadata API. System basics (OS, JVM, CPU cores, memory) are always collected. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../executor/CloudMetadataDetector.scala | 117 ++++++++++++++++++ .../executor/DataflintExecutorPlugin.scala | 77 ++++++++++++ .../executor/DriverMetadataHelper.scala | 15 +++ .../executor/ExecutorMetadataMessage.scala | 15 +++ .../listener/DataflintListener.scala | 4 + .../dataflint/listener/DataflintStore.scala | 4 + .../spark/dataflint/listener/model.scala | 22 ++++ .../spark/dataflint/saas/SparkRunStore.scala | 5 +- .../dataflint/saas/StoreDataExtractor.scala | 5 +- .../spark/SparkDataflintPlugin.scala | 39 +++++- .../spark/SparkDataflintPlugin.scala | 39 +++++- 11 files changed, 334 insertions(+), 8 deletions(-) create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/CloudMetadataDetector.scala create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DataflintExecutorPlugin.scala create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/ExecutorMetadataMessage.scala diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/CloudMetadataDetector.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/CloudMetadataDetector.scala new file mode 100644 index 00000000..13b8391f --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/CloudMetadataDetector.scala @@ -0,0 +1,117 @@ +package org.apache.spark.dataflint.executor + +import com.fasterxml.jackson.databind.ObjectMapper + +import java.io.BufferedReader +import java.io.InputStreamReader +import java.nio.file.{Files, Paths} +import java.util.concurrent.TimeUnit +import scala.util.Try + +object CloudMetadataDetector { + + case class CloudMetadata( + cloudProvider: Option[String], + instanceType: Option[String], + lifecycleType: Option[String] + ) + + private val COMMAND_TIMEOUT_MS = 5000L + + private val SYS_VENDOR_PATH = "/sys/class/dmi/id/sys_vendor" + + private val AWS_COMMAND = + """TOKEN=$(curl -sf -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" --connect-timeout 1 --max-time 2 2>/dev/null) + |if [ -n "$TOKEN" ]; then + | HEADER="-H X-aws-ec2-metadata-token:$TOKEN" + |else + | HEADER="" + |fi + |IT=$(curl -sf $HEADER "http://169.254.169.254/latest/meta-data/instance-type" --connect-timeout 1 --max-time 2 2>/dev/null) + |if [ -z "$IT" ]; then exit 1; fi + |LC=$(curl -sf $HEADER "http://169.254.169.254/latest/meta-data/instance-life-cycle" --connect-timeout 1 --max-time 2 2>/dev/null) + |echo "{\"cloudProvider\":\"aws\",\"instanceType\":\"$IT\",\"lifecycleType\":\"$LC\"}" + |""".stripMargin + + private val GCP_COMMAND = + """MT=$(curl -sf -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/machine-type" --connect-timeout 1 --max-time 2 2>/dev/null) + |if [ -z "$MT" ]; then exit 1; fi + |IT=$(echo "$MT" | rev | cut -d/ -f1 | rev) + |PR=$(curl -sf -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/scheduling/preemptible" --connect-timeout 1 --max-time 2 2>/dev/null) + |if [ "$PR" = "TRUE" ]; then LC="preemptible"; else LC="on-demand"; fi + |echo "{\"cloudProvider\":\"gcp\",\"instanceType\":\"$IT\",\"lifecycleType\":\"$LC\"}" + |""".stripMargin + + private val AZURE_COMMAND = + """VS=$(curl -sf -H "Metadata: true" "http://169.254.169.254/metadata/instance/compute/vmSize?api-version=2021-02-01&format=text" --connect-timeout 1 --max-time 2 2>/dev/null) + |if [ -z "$VS" ]; then exit 1; fi + |PR=$(curl -sf -H "Metadata: true" "http://169.254.169.254/metadata/instance/compute/priority?api-version=2021-02-01&format=text" --connect-timeout 1 --max-time 2 2>/dev/null) + |if [ "$PR" = "Spot" ]; then LC="spot"; else LC="on-demand"; fi + |echo "{\"cloudProvider\":\"azure\",\"instanceType\":\"$VS\",\"lifecycleType\":\"$LC\"}" + |""".stripMargin + + private val mapper = new ObjectMapper() + + def detect(): CloudMetadata = { + detectCloudProvider() match { + case Some("aws") => runCloudCommand(AWS_COMMAND) + case Some("gcp") => runCloudCommand(GCP_COMMAND) + case Some("azure") => runCloudCommand(AZURE_COMMAND) + case _ => CloudMetadata(None, None, None) + } + } + + private def detectCloudProvider(): Option[String] = { + Try { + val vendor = new String(Files.readAllBytes(Paths.get(SYS_VENDOR_PATH))).trim + if (vendor.contains("Amazon")) Some("aws") + else if (vendor.contains("Google")) Some("gcp") + else if (vendor.contains("Microsoft")) Some("azure") + else None + }.getOrElse(None) + } + + private def runCloudCommand(command: String): CloudMetadata = { + Try { + runBashCommand(command).flatMap(parseJson) + }.getOrElse(None).getOrElse(CloudMetadata(None, None, None)) + } + + private def runBashCommand(command: String): Option[String] = { + val process = new ProcessBuilder("bash", "-c", command) + .redirectErrorStream(false) + .start() + + val completed = process.waitFor(COMMAND_TIMEOUT_MS, TimeUnit.MILLISECONDS) + if (!completed) { + process.destroyForcibly() + return None + } + + if (process.exitValue() != 0) { + return None + } + + val reader = new BufferedReader(new InputStreamReader(process.getInputStream)) + try { + val output = reader.readLine() + if (output != null && output.nonEmpty) Some(output.trim) else None + } finally { + reader.close() + } + } + + private def parseJson(json: String): Option[CloudMetadata] = { + Try { + val node = mapper.readTree(json) + val cloudProvider = Option(node.get("cloudProvider")).map(_.asText()).filter(_.nonEmpty) + val instanceType = Option(node.get("instanceType")).map(_.asText()).filter(_.nonEmpty) + val lifecycleType = Option(node.get("lifecycleType")).map(_.asText()).filter(_.nonEmpty) + if (instanceType.isDefined || cloudProvider.isDefined) { + Some(CloudMetadata(cloudProvider, instanceType, lifecycleType)) + } else { + None + } + }.getOrElse(None) + } +} \ No newline at end of file diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DataflintExecutorPlugin.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DataflintExecutorPlugin.scala new file mode 100644 index 00000000..b5fa6bda --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DataflintExecutorPlugin.scala @@ -0,0 +1,77 @@ +package org.apache.spark.dataflint.executor + +import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext} +import org.apache.spark.internal.Logging + +import java.util + +class DataflintExecutorPlugin extends ExecutorPlugin with Logging { + + override def init(ctx: PluginContext, extraConf: util.Map[String, String]): Unit = { + val enabled = Option(extraConf.get("executor.metadata.enabled")).exists(_.toBoolean) + if (!enabled) { + return + } + + val executorId = ctx.executorID() + val hostname = try { + java.net.InetAddress.getLocalHost.getHostName + } catch { + case _: Throwable => "unknown" + } + + try { + val osName = System.getProperty("os.name", "unknown") + val osArch = System.getProperty("os.arch", "unknown") + val jvmVersion = System.getProperty("java.version", "unknown") + val availableProcessors = Runtime.getRuntime.availableProcessors() + val totalMemoryBytes = Runtime.getRuntime.maxMemory() + + val cloudMetadata = try { + CloudMetadataDetector.detect() + } catch { + case e: Throwable => + logWarning("Failed to detect cloud metadata", e) + CloudMetadataDetector.CloudMetadata(None, None, None) + } + + val message = ExecutorMetadataMessage( + executorId = executorId, + executorHost = hostname, + instanceType = cloudMetadata.instanceType, + lifecycleType = cloudMetadata.lifecycleType, + cloudProvider = cloudMetadata.cloudProvider, + osName = osName, + osArch = osArch, + jvmVersion = jvmVersion, + availableProcessors = availableProcessors, + totalMemoryBytes = totalMemoryBytes, + collectionError = None + ) + ctx.send(message) + logInfo(s"Sent executor metadata: provider=${cloudMetadata.cloudProvider}, " + + s"instance=${cloudMetadata.instanceType}, lifecycle=${cloudMetadata.lifecycleType}") + } catch { + case e: Throwable => + logWarning("Failed to collect/send executor metadata", e) + try { + ctx.send(ExecutorMetadataMessage( + executorId = executorId, + executorHost = hostname, + instanceType = None, + lifecycleType = None, + cloudProvider = None, + osName = System.getProperty("os.name", "unknown"), + osArch = System.getProperty("os.arch", "unknown"), + jvmVersion = System.getProperty("java.version", "unknown"), + availableProcessors = Runtime.getRuntime.availableProcessors(), + totalMemoryBytes = Runtime.getRuntime.maxMemory(), + collectionError = Some(e.getMessage) + )) + } catch { + case inner: Throwable => + logWarning("Failed to send error metadata to driver", inner) + } + } + } +} diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala new file mode 100644 index 00000000..621be492 --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala @@ -0,0 +1,15 @@ +package org.apache.spark.dataflint.executor + +import org.apache.spark.SparkContext +import org.apache.spark.dataflint.listener.{DataflintExecutorMetadataEvent, DataflintExecutorMetadataInfo} + +object DriverMetadataHelper { + + def isExecutorMetadataEnabled(sc: SparkContext): Boolean = { + sc.conf.getBoolean("spark.dataflint.executor.metadata.enabled", defaultValue = false) + } + + def postExecutorMetadataEvent(sc: SparkContext, info: DataflintExecutorMetadataInfo): Unit = { + sc.listenerBus.post(DataflintExecutorMetadataEvent(info)) + } +} diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/ExecutorMetadataMessage.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/ExecutorMetadataMessage.scala new file mode 100644 index 00000000..d3cca1e9 --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/ExecutorMetadataMessage.scala @@ -0,0 +1,15 @@ +package org.apache.spark.dataflint.executor + +case class ExecutorMetadataMessage( + executorId: String, + executorHost: String, + instanceType: Option[String], + lifecycleType: Option[String], + cloudProvider: Option[String], + osName: String, + osArch: String, + jvmVersion: String, + availableProcessors: Int, + totalMemoryBytes: Long, + collectionError: Option[String] +) extends java.io.Serializable diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala index 158ea192..a0d9e919 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala @@ -26,6 +26,10 @@ class DataflintListener(store: ElementTrackingStore) extends SparkListener with val wrapper = new DataflintDeltaLakeScanInfoWrapper(e.scanInfo) store.write(wrapper) } + case e: DataflintExecutorMetadataEvent => { + val wrapper = new DataflintExecutorMetadataWrapper(e.metadata) + store.write(wrapper) + } case _ => {} } } catch { diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala index 58e7b962..4237f2f2 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala @@ -36,4 +36,8 @@ class DataflintStore(val store: KVStore) { .sortBy(_.minExecutionId) } + def executorMetadata(): Seq[DataflintExecutorMetadataInfo] = { + mapToSeq(store.view(classOf[DataflintExecutorMetadataWrapper]))(_.info) + } + } diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala index 16494aae..777c3fea 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala @@ -91,3 +91,25 @@ class DataflintDeltaLakeScanInfoWrapper(val info: DataflintDeltaLakeScanInfo) { @JsonIgnore def id: String = s"${info.minExecutionId}_${info.tablePath.replaceAll(" ", "")}" } + +case class DataflintExecutorMetadataInfo( + executorId: String, + executorHost: String, + instanceType: Option[String], + lifecycleType: Option[String], + cloudProvider: Option[String], + osName: String, + osArch: String, + jvmVersion: String, + availableProcessors: Int, + totalMemoryBytes: Long, + collectionError: Option[String] +) + +case class DataflintExecutorMetadataEvent(metadata: DataflintExecutorMetadataInfo) extends SparkListenerEvent + +class DataflintExecutorMetadataWrapper(val info: DataflintExecutorMetadataInfo) { + @KVIndex + @JsonIgnore + def id: String = info.executorId +} diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkRunStore.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkRunStore.scala index bdd189ff..99cfaa55 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkRunStore.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkRunStore.scala @@ -1,6 +1,6 @@ package org.apache.spark.dataflint.saas -import org.apache.spark.dataflint.listener.{DatabricksAdditionalExecutionWrapper, DataflintDeltaLakeScanInfoWrapper, DataflintEnvironmentInfoEvent, DataflintEnvironmentInfoWrapper, DataflintRDDStorageInfoWrapper, IcebergCommitWrapper} +import org.apache.spark.dataflint.listener.{DatabricksAdditionalExecutionWrapper, DataflintDeltaLakeScanInfoWrapper, DataflintEnvironmentInfoEvent, DataflintEnvironmentInfoWrapper, DataflintExecutorMetadataWrapper, DataflintRDDStorageInfoWrapper, IcebergCommitWrapper} import org.apache.spark.sql.execution.ui.{SQLExecutionUIData, SparkPlanGraphWrapper} import org.apache.spark.status._ @@ -27,5 +27,6 @@ case class SparkRunStore( icebergCommit: Seq[IcebergCommitWrapper], dataflintEnvironmentInfo: Seq[DataflintEnvironmentInfoWrapper], dataflintRDDStorageInfo: Seq[DataflintRDDStorageInfoWrapper], - dataflintDeltaLakeScanInfo: Seq[DataflintDeltaLakeScanInfoWrapper] + dataflintDeltaLakeScanInfo: Seq[DataflintDeltaLakeScanInfoWrapper], + dataflintExecutorMetadata: Seq[DataflintExecutorMetadataWrapper] = Seq.empty ) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreDataExtractor.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreDataExtractor.scala index a6b12fcb..58f1254f 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreDataExtractor.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreDataExtractor.scala @@ -1,6 +1,6 @@ package org.apache.spark.dataflint.saas -import org.apache.spark.dataflint.listener.{DatabricksAdditionalExecutionWrapper, DataflintDeltaLakeScanInfoWrapper, DataflintEnvironmentInfoWrapper, DataflintRDDStorageInfoWrapper, IcebergCommitWrapper} +import org.apache.spark.dataflint.listener.{DatabricksAdditionalExecutionWrapper, DataflintDeltaLakeScanInfoWrapper, DataflintEnvironmentInfoWrapper, DataflintExecutorMetadataWrapper, DataflintRDDStorageInfoWrapper, IcebergCommitWrapper} import org.apache.spark.sql.execution.ui.{SQLExecutionUIData, SparkPlanGraphWrapper} import org.apache.spark.status._ @@ -35,7 +35,8 @@ class StoreDataExtractor(store: AppStatusStore) { icebergCommit = readAll[IcebergCommitWrapper], dataflintEnvironmentInfo = readAll[DataflintEnvironmentInfoWrapper], dataflintRDDStorageInfo = readAll[DataflintRDDStorageInfoWrapper], - dataflintDeltaLakeScanInfo = readAll[DataflintDeltaLakeScanInfoWrapper] + dataflintDeltaLakeScanInfo = readAll[DataflintDeltaLakeScanInfoWrapper], + dataflintExecutorMetadata = readAll[DataflintExecutorMetadataWrapper] ) } diff --git a/spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala b/spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala index 265793f0..3b11afe7 100644 --- a/spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala +++ b/spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala @@ -3,6 +3,8 @@ package io.dataflint.spark import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.dataflint.{DataflintSparkUICommonLoader, DataflintSparkUILoader} +import org.apache.spark.dataflint.executor.{DataflintExecutorPlugin, DriverMetadataHelper, ExecutorMetadataMessage} +import org.apache.spark.dataflint.listener.DataflintExecutorMetadataInfo import org.apache.spark.internal.Logging import java.util @@ -11,7 +13,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter class SparkDataflintPlugin extends SparkPlugin { override def driverPlugin(): DriverPlugin = new SparkDataflintDriverPlugin() - override def executorPlugin(): ExecutorPlugin = null + override def executorPlugin(): ExecutorPlugin = new DataflintExecutorPlugin() } class SparkDataflintDriverPlugin extends DriverPlugin with Logging { @@ -20,7 +22,40 @@ class SparkDataflintDriverPlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = { this.sc = sc DataflintSparkUICommonLoader.registerInstrumentationExtension(sc) - Map[String, String]().asJava + val executorMetadataEnabled = DriverMetadataHelper.isExecutorMetadataEnabled(sc) + Map("executor.metadata.enabled" -> executorMetadataEnabled.toString).asJava + } + + override def receive(message: Any): String = { + message match { + case msg: ExecutorMetadataMessage => + try { + val info = DataflintExecutorMetadataInfo( + executorId = msg.executorId, + executorHost = msg.executorHost, + instanceType = msg.instanceType, + lifecycleType = msg.lifecycleType, + cloudProvider = msg.cloudProvider, + osName = msg.osName, + osArch = msg.osArch, + jvmVersion = msg.jvmVersion, + availableProcessors = msg.availableProcessors, + totalMemoryBytes = msg.totalMemoryBytes, + collectionError = msg.collectionError + ) + DriverMetadataHelper.postExecutorMetadataEvent(sc, info) + logInfo(s"Received executor metadata from executor ${msg.executorId}: " + + s"provider=${msg.cloudProvider}, instance=${msg.instanceType}, lifecycle=${msg.lifecycleType}") + "OK" + } catch { + case e: Throwable => + logWarning(s"Failed to process executor metadata from ${msg.executorId}", e) + s"ERROR: ${e.getMessage}" + } + case _ => + logWarning(s"Received unknown message type: ${message.getClass.getName}") + "UNKNOWN_MESSAGE" + } } override def registerMetrics(appId: String, pluginContext: PluginContext): Unit = { diff --git a/spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala b/spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala index 265793f0..3b11afe7 100644 --- a/spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala +++ b/spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala @@ -3,6 +3,8 @@ package io.dataflint.spark import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.dataflint.{DataflintSparkUICommonLoader, DataflintSparkUILoader} +import org.apache.spark.dataflint.executor.{DataflintExecutorPlugin, DriverMetadataHelper, ExecutorMetadataMessage} +import org.apache.spark.dataflint.listener.DataflintExecutorMetadataInfo import org.apache.spark.internal.Logging import java.util @@ -11,7 +13,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter class SparkDataflintPlugin extends SparkPlugin { override def driverPlugin(): DriverPlugin = new SparkDataflintDriverPlugin() - override def executorPlugin(): ExecutorPlugin = null + override def executorPlugin(): ExecutorPlugin = new DataflintExecutorPlugin() } class SparkDataflintDriverPlugin extends DriverPlugin with Logging { @@ -20,7 +22,40 @@ class SparkDataflintDriverPlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = { this.sc = sc DataflintSparkUICommonLoader.registerInstrumentationExtension(sc) - Map[String, String]().asJava + val executorMetadataEnabled = DriverMetadataHelper.isExecutorMetadataEnabled(sc) + Map("executor.metadata.enabled" -> executorMetadataEnabled.toString).asJava + } + + override def receive(message: Any): String = { + message match { + case msg: ExecutorMetadataMessage => + try { + val info = DataflintExecutorMetadataInfo( + executorId = msg.executorId, + executorHost = msg.executorHost, + instanceType = msg.instanceType, + lifecycleType = msg.lifecycleType, + cloudProvider = msg.cloudProvider, + osName = msg.osName, + osArch = msg.osArch, + jvmVersion = msg.jvmVersion, + availableProcessors = msg.availableProcessors, + totalMemoryBytes = msg.totalMemoryBytes, + collectionError = msg.collectionError + ) + DriverMetadataHelper.postExecutorMetadataEvent(sc, info) + logInfo(s"Received executor metadata from executor ${msg.executorId}: " + + s"provider=${msg.cloudProvider}, instance=${msg.instanceType}, lifecycle=${msg.lifecycleType}") + "OK" + } catch { + case e: Throwable => + logWarning(s"Failed to process executor metadata from ${msg.executorId}", e) + s"ERROR: ${e.getMessage}" + } + case _ => + logWarning(s"Received unknown message type: ${message.getClass.getName}") + "UNKNOWN_MESSAGE" + } } override def registerMetrics(appId: String, pluginContext: PluginContext): Unit = { From 4b2e50f254212fd4681b18f0b83f7c810ca9cf83 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Mon, 20 Apr 2026 16:02:31 +0300 Subject: [PATCH 2/3] Fix one-way message warning and flaky broadcast join tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Return null from DriverPlugin.receive() since PluginContext.send() is fire-and-forget — returning a string caused a spurious warning. Relax broadcast join test assertions to duration >= 0 because the codegen sleep is only in doProduce path, not doExecute which broadcast joins use. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../spark/SparkDataflintPlugin.scala | 6 +++--- .../dataflint/DataFlintSqlNodesSpec.scala | 21 ++++++++++++------- .../spark/SparkDataflintPlugin.scala | 6 +++--- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala b/spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala index 3b11afe7..95d178b7 100644 --- a/spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala +++ b/spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala @@ -46,15 +46,15 @@ class SparkDataflintDriverPlugin extends DriverPlugin with Logging { DriverMetadataHelper.postExecutorMetadataEvent(sc, info) logInfo(s"Received executor metadata from executor ${msg.executorId}: " + s"provider=${msg.cloudProvider}, instance=${msg.instanceType}, lifecycle=${msg.lifecycleType}") - "OK" + null } catch { case e: Throwable => logWarning(s"Failed to process executor metadata from ${msg.executorId}", e) - s"ERROR: ${e.getMessage}" + null } case _ => logWarning(s"Received unknown message type: ${message.getClass.getName}") - "UNKNOWN_MESSAGE" + null } } diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintSqlNodesSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintSqlNodesSpec.scala index 1fedd284..e87ee073 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintSqlNodesSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintSqlNodesSpec.scala @@ -50,8 +50,13 @@ class DataFlintSqlNodesSpec extends AnyFunSuite with Matchers with BeforeAndAfte case t: TimedExec if t.child.getClass.getSimpleName == childName => t }.headOption - /** Assert TimedExec wraps `childName` and duration > 0. */ + /** Assert TimedExec wraps `childName` and duration >= sleepInCodeGenMs. */ private def assertWrappedWithDuration(df: DataFrame, childName: String): Unit = { + assertWrapped(df, childName, sleepInCodeGenMs) + } + + /** Assert TimedExec wraps `childName` and duration >= minDuration. */ + private def assertWrapped(df: DataFrame, childName: String, minDuration: Long): Unit = { df.collect() // trigger execution val plan = executedPlan(df) val timed = findTimed(df, childName) @@ -59,8 +64,8 @@ class DataFlintSqlNodesSpec extends AnyFunSuite with Matchers with BeforeAndAfte timed should not be empty } val duration = timed.get.metrics("duration").value - withClue(s"TimedExec($childName) duration=$duration ms should be >= $sleepInCodeGenMs") { - duration should be >= sleepInCodeGenMs + withClue(s"TimedExec($childName) duration=$duration ms should be >= $minDuration") { + duration should be >= minDuration } } @@ -124,9 +129,10 @@ class DataFlintSqlNodesSpec extends AnyFunSuite with Matchers with BeforeAndAfte private def smallDF = spark.range(0, 10, 1, 1).toDF("sid") test("BroadcastHashJoinExec") { - assertWrappedWithDuration( + // Sleep is only in codegen doProduce path; BroadcastHashJoin uses doExecute, so only check wrapping + assertWrapped( baseDF.join(smallDF, col("id") === col("sid")), - "BroadcastHashJoinExec" + "BroadcastHashJoinExec", 0L ) } @@ -143,9 +149,10 @@ class DataFlintSqlNodesSpec extends AnyFunSuite with Matchers with BeforeAndAfte } test("BroadcastNestedLoopJoinExec") { - assertWrappedWithDuration( + // Sleep is only in codegen doProduce path; BroadcastNestedLoopJoin uses doExecute, so only check wrapping + assertWrapped( baseDF.join(smallDF, col("id") > col("sid")), - "BroadcastNestedLoopJoinExec" + "BroadcastNestedLoopJoinExec", 0L ) } diff --git a/spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala b/spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala index 3b11afe7..95d178b7 100644 --- a/spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala +++ b/spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala @@ -46,15 +46,15 @@ class SparkDataflintDriverPlugin extends DriverPlugin with Logging { DriverMetadataHelper.postExecutorMetadataEvent(sc, info) logInfo(s"Received executor metadata from executor ${msg.executorId}: " + s"provider=${msg.cloudProvider}, instance=${msg.instanceType}, lifecycle=${msg.lifecycleType}") - "OK" + null } catch { case e: Throwable => logWarning(s"Failed to process executor metadata from ${msg.executorId}", e) - s"ERROR: ${e.getMessage}" + null } case _ => logWarning(s"Received unknown message type: ${message.getClass.getName}") - "UNKNOWN_MESSAGE" + null } } From 30e41a3ec6a738e56458e619635ab58e3043f833 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Mon, 27 Apr 2026 10:16:24 +0300 Subject: [PATCH 3/3] updated executor metadata config to `spark.dataflint.experimental.executor.metadata.enabled` --- .../apache/spark/dataflint/executor/DriverMetadataHelper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala index 621be492..968ca82d 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala @@ -6,7 +6,7 @@ import org.apache.spark.dataflint.listener.{DataflintExecutorMetadataEvent, Data object DriverMetadataHelper { def isExecutorMetadataEnabled(sc: SparkContext): Boolean = { - sc.conf.getBoolean("spark.dataflint.executor.metadata.enabled", defaultValue = false) + sc.conf.getBoolean("spark.dataflint.experimental.executor.metadata.enabled", defaultValue = false) } def postExecutorMetadataEvent(sc: SparkContext, info: DataflintExecutorMetadataInfo): Unit = {