Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,35 +95,6 @@ jobs:
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}

- name: Verify artifact published
run: |
VERSION=$(grep 'lazy val versionNum' build.sbt | grep -oE '"[0-9.]+"' | tr -d '"')
if [[ "$IS_RELEASE" == "true" ]]; then
URL="https://repo1.maven.org/maven2/io/dataflint/spark_2.12/${VERSION}/spark_2.12-${VERSION}.pom"
AUTH_ARGS=""
EXPECTED_VERSION="${VERSION}"
else
URL="https://central.sonatype.com/repository/maven-snapshots/io/dataflint/spark_2.12/maven-metadata.xml"
AUTH_ARGS="-u ${SONATYPE_USERNAME}:${SONATYPE_PASSWORD}"
EXPECTED_VERSION="${VERSION}-SNAPSHOT"
fi
echo "Verifying $EXPECTED_VERSION at: $URL"
sleep 15
for i in $(seq 1 10); do
if curl -sf $AUTH_ARGS "$URL" | grep -q "${EXPECTED_VERSION}"; then
echo "Artifact published successfully"
exit 0
fi
echo "Attempt $i/10 — waiting 30s..."
sleep 30
done
echo "Artifact not found after 5 minutes"
exit 1
working-directory: ./spark-plugin
env:
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}

- name: Create Release
if: github.event.inputs.release_type == 'official' || startsWith(github.ref, 'refs/tags/v')
run: |
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ spark-submit
* There is also support for scala 2.13, if your spark cluster is using scala 2.13 change package name to io.dataflint:spark_**2.13**:0.9.6
* For more installation options, including for **python** and **k8s spark-operator**, see [Install on Spark docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-spark)
* For installing DataFlint OSS in **spark history server** for observability on completed runs see [install on spark history server docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-spark-history-server)
* For installing DataFlint OSS on **DataBricks** see [install on databricks docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-databricks)
* For installing DataFlint OSS on **DataBricks** see [install on databricks docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-databricks). Databricks Runtime 17.3+ ships `javax.servlet` instead of `jakarta.servlet`, so use the dedicated shaded artifact `io.dataflint:dataflint-spark4-databricks_2.13` (same plugin class — only the jar coordinate differs).

## How it Works

Expand Down
58 changes: 57 additions & 1 deletion spark-plugin/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ lazy val dataflint = project
plugin,
pluginspark3,
pluginspark4,
pluginspark4databricks,
example_3_1_3,
example_3_2_4,
example_3_3_3,
Expand Down Expand Up @@ -162,11 +163,66 @@ lazy val pluginspark4 = (project in file("pluginspark4"))

// Include source from plugin directory for self-contained build
Compile / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala",

// Include resources from plugin directory for static UI files
Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value
)

lazy val pluginspark4databricks = (project in file("pluginspark4databricks"))
.enablePlugins(AssemblyPlugin)
.settings(
name := "dataflint-spark4-databricks",
organization := "io.dataflint",
scalaVersion := scala213,
crossScalaVersions := List(scala213), // Only Scala 2.13 for Spark 4.x
version := (if (git.gitCurrentTags.value.exists(_.startsWith("v"))) {
versionNum
} else {
versionNum + "-SNAPSHOT"
}),
libraryDependencies += "org.apache.spark" %% "spark-core" % "4.0.1" % "provided",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "4.0.1" % "provided",
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.12.470" % "provided",
libraryDependencies += "org.apache.iceberg" %% "iceberg-spark-runtime-3.5" % "1.5.0" % "provided",
libraryDependencies += "io.delta" %% "delta-spark" % "3.2.0" % "provided",

// Source-share with pluginspark4 + plugin so we don't duplicate code.
Compile / unmanagedSourceDirectories += (pluginspark4 / Compile / sourceDirectory).value / "scala",
Compile / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala",
Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value,

// Drop the upstream DataflintSparkUILoader so our local copy (which uses
// Spark4DatabricksPageFactory) is the one compiled.
Compile / unmanagedSources / excludeFilter := {
val upstreamLoader = (pluginspark4 / Compile / sourceDirectory).value /
"scala" / "org" / "apache" / "spark" / "dataflint" / "DataflintSparkUILoader.scala"
new sbt.io.SimpleFileFilter(_.getCanonicalPath == upstreamLoader.getCanonicalPath)
},

assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar",
assembly / assemblyOption := (assembly / assemblyOption).value.withIncludeScala(false),
// Rewrite jakarta.servlet → javax.servlet in our bytecode so the artifact
// loads on Databricks Runtime 17.3, which ships javax instead of jakarta.
assembly / assemblyShadeRules := Seq(
ShadeRule.rename("jakarta.servlet.**" -> "javax.servlet.@1").inAll
),
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
},

Compile / packageBin := assembly.value,
publishTo := {
if (isSnapshot.value)
Some("snapshots" at "https://central.sonatype.com/repository/maven-snapshots/")
else
sonatypePublishToBundle.value
}
)

lazy val example_3_1_3 = (project in file("example_3_1_3"))
.settings(
name := "DataflintSparkExample313",
Expand Down
5 changes: 5 additions & 0 deletions spark-plugin/clean-and-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ sbt pluginspark3/assembly
echo "Building Spark 4 fat JAR..."
sbt ++2.13.16 pluginspark4/assembly

# Build Spark 4 Databricks fat JAR (jakarta.servlet → javax.servlet shaded)
echo "Building Spark 4 Databricks fat JAR..."
sbt ++2.13.16 pluginspark4databricks/assembly

echo "✅ Setup complete!"
echo ""
echo "📋 Next steps:"
Expand All @@ -40,4 +44,5 @@ echo ""
echo "📦 Fat JARs created:"
echo "- Spark 3.x: pluginspark3/target/scala-2.12/dataflint-spark3_2.12-0.9.7-SNAPSHOT.jar"
echo "- Spark 4.x: pluginspark4/target/scala-2.13/dataflint-spark4_2.13-0.9.7-SNAPSHOT.jar"
echo "- Spark 4 (Databricks): pluginspark4databricks/target/scala-2.13/dataflint-spark4-databricks_2.13-0.9.7-SNAPSHOT.jar"

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ object MetricsUtils {
def getSizeMetric(name: String)(implicit sparkContext: SparkContext): (String, SQLMetric) = {
name -> {
try {
SQLMetrics.createSizeMetric(sparkContext, name)
// Pass initValue explicitly to avoid emitting a `createSizeMetric$default$3()`
// bytecode call. Databricks Runtime 17.x rewrites SQLMetrics with explicit
// overloads instead of Scala default args, so the $default$3 helper is missing
// and the call would NoSuchMethodError before ever reaching createSizeMetric.
SQLMetrics.createSizeMetric(sparkContext, name, -1L)
} catch {
case _: NoSuchMethodError =>
try {
Expand Down Expand Up @@ -84,7 +88,9 @@ object MetricsUtils {
def getTimingMetric(name: String)(implicit sparkContext: SparkContext): (String, SQLMetric) = {
name -> {
try {
SQLMetrics.createTimingMetric(sparkContext, name)
// See note on getSizeMetric: pass initValue explicitly so the bytecode skips
// the missing `createTimingMetric$default$3()` helper on Databricks runtimes.
SQLMetrics.createTimingMetric(sparkContext, name, -1L)
} catch {
case _: NoSuchMethodError =>
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.spark.dataflint

import org.apache.spark.SparkContext
import org.apache.spark.dataflint.api.Spark4DatabricksPageFactory
import org.apache.spark.ui.SparkUI

/**
* Databricks variant of the Spark 4 loader. Identical to the pluginspark4
* loader except it instantiates `Spark4DatabricksPageFactory`, which
* inverts the Databricks UI gate so the shaded jar serves UI only on DBR.
* Same FQN as the upstream loader so the shared `SparkDataflintPlugin`
* entrypoint resolves it without any per-flavor wiring.
*/
object DataflintSparkUILoader {
private val pageFactory = new Spark4DatabricksPageFactory()

def install(context: SparkContext): String =
new org.apache.spark.dataflint.DataflintSparkUICommonInstaller().install(context, pageFactory)

def loadUI(ui: SparkUI): String =
new org.apache.spark.dataflint.DataflintSparkUICommonInstaller().loadUI(ui, pageFactory)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.spark.dataflint.api

import org.apache.spark.ui.SparkUI

/**
* Databricks variant of [[Spark4PageFactory]]. The parent class skips the
* DataFlint UI on any Databricks runtime to avoid the jakarta.servlet
* NoClassDefFoundError on DBR 17.3 (see issue #47). This subclass inverts
* the check: enable UI ONLY on Databricks (where the javax-shaded bytecode
* matches the runtime). If this jar is installed on stock Spark 4 by
* mistake, the UI is silently skipped instead of crashing.
*/
class Spark4DatabricksPageFactory extends Spark4PageFactory {
override def isUISupported(ui: SparkUI): Boolean =
ui.conf.getOption("spark.databricks.clusterUsageTags.cloudProvider").isDefined
}
7 changes: 6 additions & 1 deletion spark-ui/src/utils/FormatUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ export function timeStringToMilliseconds(
case "h":
return duration(value, "hours").asMilliseconds();
default:
throw new Error(`Unsupported time unit: ${unit}`);
// Some Spark forks (e.g. Databricks) return certain duration-like metrics as
// bare numbers without a unit suffix, which slices to digit pairs. Return
// undefined instead of throwing — every caller already handles undefined
// with `?? 0` and the rest of the page keeps rendering.
console.warn(`timeStringToMilliseconds: unsupported time unit "${unit}" in "${timeString}"`);
return undefined;
}
}

Expand Down
Loading