Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,13 @@ jobs:
sed -i "s/${VERSION}/${NEW_VERSION}/g" pyspark-testing/dataflint_pyspark_example.py
sed -i "s/${VERSION}/${NEW_VERSION}/g" pyspark-testing/dataflint_pyspark_example_test.py
sed -i "s/${VERSION}/${NEW_VERSION}/g" pyspark-testing/README.md
OLD_README_VERSION=$(grep -oE '[0-9]+\.[0-9]+\.[0-9]+' ../README.md | head -1)
if [ -n "$OLD_README_VERSION" ] && [ "$OLD_README_VERSION" != "$VERSION" ]; then
sed -i "s/${OLD_README_VERSION}/${VERSION}/g" ../README.md
fi
git config user.email "github-actions[bot]@users.noreply.github.com"
git config user.name "github-actions[bot]"
git add build.sbt ../spark-ui/package.json clean-and-setup.sh pyspark-testing/
git add build.sbt ../spark-ui/package.json clean-and-setup.sh pyspark-testing/ ../README.md
git commit -m "version bump to ${NEW_VERSION}"
git push origin HEAD:main
working-directory: ./spark-plugin
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ See [Our Features](https://dataflint.gitbook.io/dataflint-for-spark/overview/our
Install DataFlint OSS via sbt:
For Spark 3.X:
```sbt
libraryDependencies += "io.dataflint" %% "spark" % "0.8.8"
libraryDependencies += "io.dataflint" %% "spark" % "0.9.4"
```

For Spark 4.X:
```sbt
libraryDependencies += "io.dataflint" %% "dataflint-spark4" % "0.8.8"
libraryDependencies += "io.dataflint" %% "dataflint-spark4" % "0.9.4"
```


Expand All @@ -87,7 +87,7 @@ For Spark 3.X:
```python
builder = pyspark.sql.SparkSession.builder
...
.config("spark.jars.packages", "io.dataflint:spark_2.12:0.8.8") \
.config("spark.jars.packages", "io.dataflint:spark_2.12:0.9.4") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
...
```
Expand All @@ -96,7 +96,7 @@ For Spark 4.X:
```python
builder = pyspark.sql.SparkSession.builder
...
.config("spark.jars.packages", "io.dataflint:dataflint-spark4_2.13:0.8.8") \
.config("spark.jars.packages", "io.dataflint:dataflint-spark4_2.13:0.9.4") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
...
```
Expand All @@ -107,22 +107,22 @@ Alternatively, install DataFlint OSS with **no code change** as a spark ivy pack

```bash
spark-submit
--packages io.dataflint:spark_2.12:0.8.8 \
--packages io.dataflint:spark_2.12:0.9.4 \
--conf spark.plugins=io.dataflint.spark.SparkDataflintPlugin \
...
```

For Spark 4.X:
```bash
spark-submit
--packages io.dataflint:dataflint-spark4_2.13:0.8.8 \
--packages io.dataflint:dataflint-spark4_2.13:0.9.4 \
--conf spark.plugins=io.dataflint.spark.SparkDataflintPlugin \
...
```

### Additional installation options

* There is also support for scala 2.13, if your spark cluster is using scala 2.13 change package name to io.dataflint:spark_**2.13**:0.8.8
* There is also support for scala 2.13, if your spark cluster is using scala 2.13 change package name to io.dataflint:spark_**2.13**:0.9.4
* For more installation options, including for **python** and **k8s spark-operator**, see [Install on Spark docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-spark)
* For installing DataFlint OSS in **spark history server** for observability on completed runs see [install on spark history server docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-spark-history-server)
* For installing DataFlint OSS on **DataBricks** see [install on databricks docs](https://dataflint.gitbook.io/dataflint-for-spark/getting-started/install-on-databricks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class TimedExec(val child: SparkPlan) extends SparkPlan with Logging {
* Only instantiated via TimedExec.apply() when child is CodegenSupport.
*/
class TimedWithCodegenExec(override val child: SparkPlan) extends TimedExec(child) with CodegenSupport {
override def nodeName: String = ("DataFlint" + child.nodeName).replace(" ", "")

override def inputRDDs(): Seq[RDD[InternalRow]] = {
val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
Expand Down
58 changes: 58 additions & 0 deletions spark-plugin/pyspark-testing/dataflint_pyspark_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,64 @@ def double_price(price: pd.Series) -> pd.Series:
print("\nResult written to /tmp/dataflint_cache_filter_union_example")


# ── RDDScanExec codegen with complex types ───────────────────────────────────
# Reproduces codegen compilation error when TimedWithCodegenExec wraps RDDScanExec
# ("Scan ExistingRDD") — the space in nodeName produces invalid Java identifiers
# via variablePrefix. GenerateUnsafeProjection uses ctx.freshName("previousCursor")
# which picks up the space-containing prefix when projecting structs/arrays/maps.
print("="*80)
print("Running RDDScanExec codegen with complex types (struct/array)")
print("="*80)
from pyspark.sql.types import ArrayType, MapType
from pyspark.sql.functions import struct, array as spark_array, create_map

complex_data = [
("Alice", "Electronics", [1, 2, 3], {"color": "red", "size": "M"}),
("Bob", "Books", [4, 5], {"color": "blue", "size": "L"}),
("Charlie", "Clothing", [6], {"color": "green", "size": "S"}),
] * 1000

complex_schema = StructType([
StructField("customer", StringType(), False),
StructField("category", StringType(), False),
StructField("tags", ArrayType(IntegerType()), True),
StructField("attributes", MapType(StringType(), StringType()), True),
])

df_complex = spark.createDataFrame(complex_data, complex_schema)

spark.sparkContext.setJobDescription("RDDScanExec codegen: struct/array/map projection triggers UnsafeProjection")
df_complex.select(
col("customer"),
struct(col("category"), col("tags")).alias("info"),
col("attributes"),
) \
.filter(col("customer") != "Alice") \
.write.mode("overwrite").parquet("/tmp/dataflint_rddscan_complex_codegen_test")
print("\nResult written to /tmp/dataflint_rddscan_complex_codegen_test")

# ── RDDScanExec codegen stress test ──────────────────────────────────────────
print("="*80)
print("Running RDDScanExec codegen stress test (variablePrefix space issue)")
print("="*80)
spark.sparkContext.setJobDescription("RDDScanExec codegen: multi-column projection + filter + agg over createDataFrame")
df.select(
col("customer"),
col("category"),
(col("price") * col("quantity")).alias("revenue"),
(col("price") * 0.9).alias("discounted_price"),
(col("quantity") + 1).alias("quantity_plus_one"),
) \
.filter(col("revenue") > 100) \
.groupBy("category") \
.agg(
spark_sum("revenue").alias("total_revenue"),
avg("discounted_price").alias("avg_discounted"),
) \
.write.mode("overwrite").parquet("/tmp/dataflint_rddscan_codegen_test")
print("\nResult written to /tmp/dataflint_rddscan_codegen_test")


print("\n" + "="*80)
print("Done!")
print("="*80)
Expand Down
11 changes: 9 additions & 2 deletions spark-ui/src/reducers/SqlReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ function calculateSql(
const childEdges = enrichedSql.edges.filter(e => e.toId === node.nodeId);
if (childEdges.length !== 1) continue;
const childNode = enrichedSql.nodes.find(n => n.nodeId === childEdges[0].fromId);
if (!childNode || childNode.nodeName !== strippedName) continue;
if (!childNode || childNode.nodeName.replace(/ /g, "") !== strippedName.replace(/ /g, "")) continue;
// Keep child, add wrapper's extra metrics, mark as instrumented with DataFlint prefix
const childMetricNames = new Set(childNode.metrics.map(m => m.name));
const extraMetrics = node.metrics.filter(m => !childMetricNames.has(m.name));
Expand All @@ -299,9 +299,16 @@ function calculateSql(
// Replace nodeName with the stripped version so all downstream code sees the base name
const normalizedNode = { ...node, nodeName: strippedNodeName };
const type = calcNodeType(normalizedNode.nodeName);
const nodePlan = plan?.nodesPlan.find(
const rawNodePlan = plan?.nodesPlan.find(
(planNode) => planNode.id === normalizedNode.nodeId,
);
// Strip "DataFlint<NodeName> " prefix from planDescription so parsers see the original format.
// Description format: "DataFlint<NodeName> <OriginalNodeName> <rest>", e.g.:
// "DataFlintFilter Filter (cond)" → "Filter (cond)"
// "DataFlintExecute InsertInto... Execute InsertInto... file:..." → "Execute InsertInto... file:..."
const nodePlan = rawNodePlan && isInstrumented
? { ...rawNodePlan, planDescription: rawNodePlan.planDescription.replace("DataFlint" + strippedNodeName + " ", "") }
: rawNodePlan;
const parsedPlan =
nodePlan !== undefined ? parseNodePlan(normalizedNode, nodePlan) : undefined;
const isCodegenNode = normalizedNode.nodeName.includes("WholeStageCodegen");
Expand Down
6 changes: 3 additions & 3 deletions spark-ui/src/services/SparkApi.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ class SparkAPI {
? currentAttempt.attemptId
: undefined;
this.sparkVersion = currentAttempt?.appSparkVersion;
// Paginated SQL API (/sql?offset=&length=) was added in Spark 3.2+
// Older apps on the history server need the non-paginated /sql endpoint
if (this.sparkVersion) {
// Paginated SQL API (/sql?offset=&length=) returns 404 for pre-3.2 apps
// on the history server. Live apps support it on all versions.
if (this.historyServerMode && this.sparkVersion) {
const [major, minor] = this.sparkVersion.split(".").map(Number);
this.useLegacySqlApi = major < 3 || (major === 3 && minor < 2);
}
Expand Down
Loading