From c76732733381f5d93474f2ec4976e8e6e08e783d Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Tue, 28 Apr 2026 14:57:37 +0300 Subject: [PATCH 1/3] updated bump version for README.md --- .github/workflows/cd.yml | 6 +++++- README.md | 14 +++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 611cf00..f4827cd 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -147,9 +147,13 @@ jobs: sed -i "s/${VERSION}/${NEW_VERSION}/g" pyspark-testing/dataflint_pyspark_example.py sed -i "s/${VERSION}/${NEW_VERSION}/g" pyspark-testing/dataflint_pyspark_example_test.py sed -i "s/${VERSION}/${NEW_VERSION}/g" pyspark-testing/README.md + OLD_README_VERSION=$(grep -oE '[0-9]+\.[0-9]+\.[0-9]+' ../README.md | head -1) + if [ -n "$OLD_README_VERSION" ] && [ "$OLD_README_VERSION" != "$VERSION" ]; then + sed -i "s/${OLD_README_VERSION}/${VERSION}/g" ../README.md + fi git config user.email "github-actions[bot]@users.noreply.github.com" git config user.name "github-actions[bot]" - git add build.sbt ../spark-ui/package.json clean-and-setup.sh pyspark-testing/ + git add build.sbt ../spark-ui/package.json clean-and-setup.sh pyspark-testing/ ../README.md git commit -m "version bump to ${NEW_VERSION}" git push origin HEAD:main working-directory: ./spark-plugin \ No newline at end of file diff --git a/README.md b/README.md index d800046..7465c8d 100644 --- a/README.md +++ b/README.md @@ -62,12 +62,12 @@ See [Our Features](https://dataflint.gitbook.io/dataflint-for-spark/overview/our Install DataFlint OSS via sbt: For Spark 3.X: ```sbt -libraryDependencies += "io.dataflint" %% "spark" % "0.8.8" +libraryDependencies += "io.dataflint" %% "spark" % "0.9.4" ``` For Spark 4.X: ```sbt -libraryDependencies += "io.dataflint" %% "dataflint-spark4" % "0.8.8" +libraryDependencies += "io.dataflint" %% "dataflint-spark4" % "0.9.4" ``` @@ -87,7 +87,7 @@ For Spark 3.X: ```python builder = pyspark.sql.SparkSession.builder ... - .config("spark.jars.packages", "io.dataflint:spark_2.12:0.8.8") \ + .config("spark.jars.packages", "io.dataflint:spark_2.12:0.9.4") \ .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \ ... ``` @@ -96,7 +96,7 @@ For Spark 4.X: ```python builder = pyspark.sql.SparkSession.builder ... - .config("spark.jars.packages", "io.dataflint:dataflint-spark4_2.13:0.8.8") \ + .config("spark.jars.packages", "io.dataflint:dataflint-spark4_2.13:0.9.4") \ .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \ ... ``` @@ -107,7 +107,7 @@ Alternatively, install DataFlint OSS with **no code change** as a spark ivy pack ```bash spark-submit ---packages io.dataflint:spark_2.12:0.8.8 \ +--packages io.dataflint:spark_2.12:0.9.4 \ --conf spark.plugins=io.dataflint.spark.SparkDataflintPlugin \ ... ``` @@ -115,14 +115,14 @@ spark-submit For Spark 4.X: ```bash spark-submit ---packages io.dataflint:dataflint-spark4_2.13:0.8.8 \ +--packages io.dataflint:dataflint-spark4_2.13:0.9.4 \ --conf spark.plugins=io.dataflint.spark.SparkDataflintPlugin \ ... ``` ### Additional installation options -* There is also support for scala 2.13, if your spark cluster is using scala 2.13 change package name to io.dataflint:spark_**2.13**:0.8.8 +* There is also support for scala 2.13, if your spark cluster is using scala 2.13 change package name to io.dataflint:spark_**2.13**:0.9.4 * For more installation options, including for **python** and **k8s spark-operator**, see [Install on Spark docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-spark) * For installing DataFlint OSS in **spark history server** for observability on completed runs see [install on spark history server docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-spark-history-server) * For installing DataFlint OSS on **DataBricks** see [install on databricks docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-databricks) From dc741a2ff36ec829057a28877157b99f2fe901c1 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Tue, 28 Apr 2026 17:26:36 +0300 Subject: [PATCH 2/3] Fix codegen variablePrefix space issue and restrict useLegacySqlApi to history server - TimedWithCodegenExec: override nodeName to remove spaces, preventing invalid Java identifiers in generated code (variablePrefix derives from nodeName.toLowerCase) - SqlReducer: compare node names ignoring spaces when merging wrapper pairs - SparkApi: only use non-paginated /sql endpoint on history server with Spark < 3.2 (live apps support pagination on all versions) - PySpark tests: add complex types and multi-column projection test cases Co-Authored-By: Claude Opus 4.6 (1M context) --- .../apache/spark/dataflint/TimedExec.scala | 1 + .../dataflint_pyspark_example.py | 58 +++++++++++++++++++ spark-ui/src/reducers/SqlReducer.ts | 2 +- spark-ui/src/services/SparkApi.tsx | 6 +- 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala index 9cdce0f..04478c9 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/TimedExec.scala @@ -142,6 +142,7 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with Logging { * Only instantiated via TimedExec.apply() when child is CodegenSupport. */ class TimedWithCodegenExec(override val child: SparkPlan) extends TimedExec(child) with CodegenSupport { + override def nodeName: String = ("DataFlint" + child.nodeName).replace(" ", "") override def inputRDDs(): Seq[RDD[InternalRow]] = { val rdds = child.asInstanceOf[CodegenSupport].inputRDDs() diff --git a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py index 98d5e3f..a5cdf2e 100755 --- a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py +++ b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py @@ -579,6 +579,64 @@ def double_price(price: pd.Series) -> pd.Series: print("\nResult written to /tmp/dataflint_cache_filter_union_example") +# ── RDDScanExec codegen with complex types ─────────────────────────────────── +# Reproduces codegen compilation error when TimedWithCodegenExec wraps RDDScanExec +# ("Scan ExistingRDD") — the space in nodeName produces invalid Java identifiers +# via variablePrefix. GenerateUnsafeProjection uses ctx.freshName("previousCursor") +# which picks up the space-containing prefix when projecting structs/arrays/maps. +print("="*80) +print("Running RDDScanExec codegen with complex types (struct/array)") +print("="*80) +from pyspark.sql.types import ArrayType, MapType +from pyspark.sql.functions import struct, array as spark_array, create_map + +complex_data = [ + ("Alice", "Electronics", [1, 2, 3], {"color": "red", "size": "M"}), + ("Bob", "Books", [4, 5], {"color": "blue", "size": "L"}), + ("Charlie", "Clothing", [6], {"color": "green", "size": "S"}), +] * 1000 + +complex_schema = StructType([ + StructField("customer", StringType(), False), + StructField("category", StringType(), False), + StructField("tags", ArrayType(IntegerType()), True), + StructField("attributes", MapType(StringType(), StringType()), True), +]) + +df_complex = spark.createDataFrame(complex_data, complex_schema) + +spark.sparkContext.setJobDescription("RDDScanExec codegen: struct/array/map projection triggers UnsafeProjection") +df_complex.select( + col("customer"), + struct(col("category"), col("tags")).alias("info"), + col("attributes"), +) \ + .filter(col("customer") != "Alice") \ + .write.mode("overwrite").parquet("/tmp/dataflint_rddscan_complex_codegen_test") +print("\nResult written to /tmp/dataflint_rddscan_complex_codegen_test") + +# ── RDDScanExec codegen stress test ────────────────────────────────────────── +print("="*80) +print("Running RDDScanExec codegen stress test (variablePrefix space issue)") +print("="*80) +spark.sparkContext.setJobDescription("RDDScanExec codegen: multi-column projection + filter + agg over createDataFrame") +df.select( + col("customer"), + col("category"), + (col("price") * col("quantity")).alias("revenue"), + (col("price") * 0.9).alias("discounted_price"), + (col("quantity") + 1).alias("quantity_plus_one"), +) \ + .filter(col("revenue") > 100) \ + .groupBy("category") \ + .agg( + spark_sum("revenue").alias("total_revenue"), + avg("discounted_price").alias("avg_discounted"), + ) \ + .write.mode("overwrite").parquet("/tmp/dataflint_rddscan_codegen_test") +print("\nResult written to /tmp/dataflint_rddscan_codegen_test") + + print("\n" + "="*80) print("Done!") print("="*80) diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index 68d3721..9f9d8cd 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -274,7 +274,7 @@ function calculateSql( const childEdges = enrichedSql.edges.filter(e => e.toId === node.nodeId); if (childEdges.length !== 1) continue; const childNode = enrichedSql.nodes.find(n => n.nodeId === childEdges[0].fromId); - if (!childNode || childNode.nodeName !== strippedName) continue; + if (!childNode || childNode.nodeName.replace(/ /g, "") !== strippedName.replace(/ /g, "")) continue; // Keep child, add wrapper's extra metrics, mark as instrumented with DataFlint prefix const childMetricNames = new Set(childNode.metrics.map(m => m.name)); const extraMetrics = node.metrics.filter(m => !childMetricNames.has(m.name)); diff --git a/spark-ui/src/services/SparkApi.tsx b/spark-ui/src/services/SparkApi.tsx index 6942451..293373f 100644 --- a/spark-ui/src/services/SparkApi.tsx +++ b/spark-ui/src/services/SparkApi.tsx @@ -317,9 +317,9 @@ class SparkAPI { ? currentAttempt.attemptId : undefined; this.sparkVersion = currentAttempt?.appSparkVersion; - // Paginated SQL API (/sql?offset=&length=) was added in Spark 3.2+ - // Older apps on the history server need the non-paginated /sql endpoint - if (this.sparkVersion) { + // Paginated SQL API (/sql?offset=&length=) returns 404 for pre-3.2 apps + // on the history server. Live apps support it on all versions. + if (this.historyServerMode && this.sparkVersion) { const [major, minor] = this.sparkVersion.split(".").map(Number); this.useLegacySqlApi = major < 3 || (major === 3 && minor < 2); } From e080c1015cef24b3531dd5a7d24acc5d189e80a9 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Tue, 28 Apr 2026 18:09:31 +0300 Subject: [PATCH 3/3] Strip DataFlint prefix from planDescription before parsing The sqlplan endpoint returns planDescription with the DataFlint-prefixed nodeName (e.g. "DataFlintFilter Filter (cond)"). Parsers expect the original format ("Filter (cond)"). Strip the "DataFlint " prefix using the already-computed strippedNodeName. Co-Authored-By: Claude Opus 4.6 (1M context) --- spark-ui/src/reducers/SqlReducer.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index 9f9d8cd..9944362 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -299,9 +299,16 @@ function calculateSql( // Replace nodeName with the stripped version so all downstream code sees the base name const normalizedNode = { ...node, nodeName: strippedNodeName }; const type = calcNodeType(normalizedNode.nodeName); - const nodePlan = plan?.nodesPlan.find( + const rawNodePlan = plan?.nodesPlan.find( (planNode) => planNode.id === normalizedNode.nodeId, ); + // Strip "DataFlint " prefix from planDescription so parsers see the original format. + // Description format: "DataFlint ", e.g.: + // "DataFlintFilter Filter (cond)" → "Filter (cond)" + // "DataFlintExecute InsertInto... Execute InsertInto... file:..." → "Execute InsertInto... file:..." + const nodePlan = rawNodePlan && isInstrumented + ? { ...rawNodePlan, planDescription: rawNodePlan.planDescription.replace("DataFlint" + strippedNodeName + " ", "") } + : rawNodePlan; const parsedPlan = nodePlan !== undefined ? parseNodePlan(normalizedNode, nodePlan) : undefined; const isCodegenNode = normalizedNode.nodeName.includes("WholeStageCodegen");