Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker/gluten/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
jars/
test_data/
spark-events/
61 changes: 61 additions & 0 deletions docker/gluten/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Spark + Gluten/Velox + DataFlint example runner
#
# Build arguments:
# SPARK_VERSION - Spark version (default: 3.5.7)
# GLUTEN_JAR - Filename of the Gluten bundle jar in jars/ directory
#
# Usage:
# ./run-gluten-example.sh (recommended — builds everything and runs)
# docker compose up --build (if jars are already in jars/)

ARG SPARK_VERSION=3.5.7

FROM apache/spark:${SPARK_VERSION}

ARG SPARK_VERSION=3.5.7

USER root

# Create directories for event logs and test data
RUN mkdir -p /tmp/spark-events && \
chown -R spark:spark /tmp/spark-events && \
mkdir -p /opt/spark/work-dir/test_data && \
chown -R spark:spark /opt/spark/work-dir/test_data

# Copy all jars (Gluten bundle + DataFlint plugin + example) into Spark's jars dir
COPY jars/*.jar /opt/spark/jars/

# Copy test data
COPY test_data/ /opt/spark/work-dir/test_data/

# Configure Spark defaults for Gluten
# The --add-opens flags are required because the Gluten nightly (JDK8 target) uses
# sun.misc.Unsafe / DirectByteBuffer internals that are module-restricted on Java 11+.
RUN mkdir -p /opt/spark/conf && \
echo "spark.plugins=io.dataflint.spark.SparkDataflintPlugin,org.apache.gluten.GlutenPlugin" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.memory.offHeap.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.memory.offHeap.size=4g" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.eventLog.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.eventLog.dir=/tmp/spark-events" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.ui.port=10000" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.dataflint.telemetry.enabled=false" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.sql.maxMetadataStringLength=10000" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.sql.adaptive.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.driver.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf && \
echo "spark.executor.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf

USER spark

EXPOSE 10000

WORKDIR /opt/spark/work-dir

ENV _JAVA_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true"

# Run the Gluten example via spark-submit
CMD ["/opt/spark/bin/spark-submit", \
"--master", "local[*]", \
"--class", "io.dataflint.example.GlutenVeloxExample", \
"--driver-memory", "2g", \
"/opt/spark/jars/example.jar"]
16 changes: 16 additions & 0 deletions docker/gluten/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
services:
spark-gluten-example:
build:
context: .
dockerfile: Dockerfile
args:
SPARK_VERSION: ${SPARK_VERSION:-3.5.7}
image: dataflint-gluten-example:${SPARK_VERSION:-3.5.7}
container_name: dataflint-gluten-example
ports:
- "${SPARK_UI_PORT:-10000}:10000"
volumes:
- ${SPARK_EVENTS_DIR:-./spark-events}:/tmp/spark-events
environment:
- SPARK_NO_DAEMONIZE=true
restart: "no"
143 changes: 143 additions & 0 deletions docker/gluten/run-gluten-example.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#!/bin/bash
set -e

# Run DataFlint Gluten/Velox Example
#
# This script:
# 1. Builds the DataFlint UI and plugin jar
# 2. Packages the Gluten example app
# 3. Downloads the Gluten nightly bundle jar (cached)
# 4. Builds and runs the Docker container
#
# Prerequisites: Node.js 20+, Java 8+, sbt, Docker
#
# Usage:
# ./run-gluten-example.sh # full build + run
# ./run-gluten-example.sh --skip-build # skip sbt/npm, just rebuild Docker
# ./run-gluten-example.sh --amd64 # force x86_64 (Rosetta 2 emulation)

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
JARS_DIR="$SCRIPT_DIR/jars"
TEST_DATA_DIR="$SCRIPT_DIR/test_data"
SPARK_EVENTS_DIR="$SCRIPT_DIR/spark-events"

SPARK_VERSION="${SPARK_VERSION:-3.5.7}"
SCALA_VERSION="${SCALA_VERSION:-2.12}"

SKIP_BUILD=false
FORCE_AMD64=false

for arg in "$@"; do
case $arg in
--skip-build) SKIP_BUILD=true ;;
--amd64) FORCE_AMD64=true ;;
esac
done

# Detect architecture for Gluten jar download
ARCH=$(uname -m)
if [ "$FORCE_AMD64" = true ]; then
GLUTEN_ARCH="linux_amd64"
DOCKER_PLATFORM="--platform linux/amd64"
elif [ "$ARCH" = "arm64" ] || [ "$ARCH" = "aarch64" ]; then
GLUTEN_ARCH="linux_aarch64"
DOCKER_PLATFORM=""
else
GLUTEN_ARCH="linux_amd64"
DOCKER_PLATFORM=""
fi

GLUTEN_JAR_NAME="gluten-velox-bundle-spark3.5_2.12-${GLUTEN_ARCH}-1.7.0-SNAPSHOT.jar"
GLUTEN_JAR_URL="https://nightlies.apache.org/gluten/nightly-release-jdk8/${GLUTEN_JAR_NAME}"

echo "=== DataFlint Gluten/Velox Example ==="
echo "Project root: $PROJECT_ROOT"
echo "Spark version: $SPARK_VERSION"
echo "Architecture: $GLUTEN_ARCH"
echo "Gluten jar: $GLUTEN_JAR_NAME"
echo ""

mkdir -p "$JARS_DIR"
mkdir -p "$SPARK_EVENTS_DIR"

# --- Step 1: Download Gluten nightly jar (cached) ---
echo "=== Step 1: Downloading Gluten nightly jar ==="
if [ -f "$JARS_DIR/$GLUTEN_JAR_NAME" ]; then
echo "Gluten jar already cached: $JARS_DIR/$GLUTEN_JAR_NAME"
else
echo "Downloading: $GLUTEN_JAR_URL"
curl -fSL -o "$JARS_DIR/$GLUTEN_JAR_NAME" "$GLUTEN_JAR_URL"
echo "Downloaded successfully."
fi

if [ "$SKIP_BUILD" = false ]; then
# --- Step 2: Build DataFlint UI ---
echo ""
echo "=== Step 2: Building DataFlint UI ==="
cd "$PROJECT_ROOT/spark-ui"
if [ ! -d "node_modules" ]; then
echo "Installing npm dependencies..."
npm ci
fi
echo "Building and deploying UI into plugin resources..."
npm run deploy

# --- Step 3: Build DataFlint plugin jar ---
echo ""
echo "=== Step 3: Building DataFlint plugin jar ==="
cd "$PROJECT_ROOT/spark-plugin"
export SBT_OPTS="-Xmx4G -Xss2M -XX:+UseG1GC"
sbt "pluginspark3/assembly"

# --- Step 4: Package example jar ---
echo ""
echo "=== Step 4: Packaging example jar ==="
sbt "example_3_5_1/package"
fi

# --- Step 5: Copy jars to docker context ---
echo ""
echo "=== Step 5: Copying jars to Docker context ==="

# DataFlint plugin jar
PLUGIN_JAR=$(find "$PROJECT_ROOT/spark-plugin/pluginspark3/target/scala-${SCALA_VERSION}" -name "spark_${SCALA_VERSION}-*.jar" -type f | head -1)
if [ -z "$PLUGIN_JAR" ]; then
echo "ERROR: DataFlint plugin jar not found. Run without --skip-build first."
exit 1
fi
cp "$PLUGIN_JAR" "$JARS_DIR/dataflint-plugin.jar"
echo "Copied DataFlint plugin: $(basename "$PLUGIN_JAR")"

# Example jar
EXAMPLE_JAR=$(find "$PROJECT_ROOT/spark-plugin/example_3_5_1/target/scala-${SCALA_VERSION}" -name "DataflintSparkExample351_${SCALA_VERSION}-*.jar" -type f | head -1)
if [ -z "$EXAMPLE_JAR" ]; then
echo "ERROR: Example jar not found. Run without --skip-build first."
exit 1
fi
cp "$EXAMPLE_JAR" "$JARS_DIR/example.jar"
echo "Copied example jar: $(basename "$EXAMPLE_JAR")"

echo "Gluten jar: $GLUTEN_JAR_NAME"

# --- Step 6: Copy test data ---
echo ""
echo "=== Step 6: Copying test data ==="
rm -rf "$TEST_DATA_DIR"
cp -r "$PROJECT_ROOT/spark-plugin/test_data" "$TEST_DATA_DIR"
echo "Copied test_data/"

# --- Step 7: Build and run Docker ---
echo ""
echo "=== Step 7: Building and running Docker container ==="
cd "$SCRIPT_DIR"

# Stop any previous container
docker compose down 2>/dev/null || true

# Build with platform flag if needed
if [ -n "$DOCKER_PLATFORM" ]; then
DOCKER_DEFAULT_PLATFORM=linux/amd64 docker compose up --build
else
docker compose up --build
fi
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object DataFusionCometExample extends App {
println(s"number of unique words : $uniqueWords")


spark.read.load("/Users/menishmueli/Documents/GitHub/spark-sql-perf/data/store_sales").filter($"ss_quantity" > 1).count()
// spark.read.load("/Users/menishmueli/Documents/GitHub/spark-sql-perf/data/store_sales").filter($"ss_quantity" > 1).count()

scala.io.StdIn.readLine()
spark.stop()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.dataflint.example

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

object GlutenVeloxExample extends App {
val spark = SparkSession
.builder()
.appName("GlutenVeloxExample")
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin,org.apache.gluten.GlutenPlugin")
.config("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "4g")
.config("spark.ui.port", "10000")
.config("spark.eventLog.enabled", "true")
.config("spark.eventLog.dir", "/tmp/spark-events")
.config("spark.dataflint.telemetry.enabled", value = false)
.config("spark.sql.maxMetadataStringLength", "10000")
.config("spark.sql.adaptive.enabled", "true")
.master("local[*]")
.getOrCreate()

import spark.implicits._

def shakespeareDF: DataFrame = spark.read
.format("csv")
.option("sep", ";")
.option("inferSchema", true)
.load("./test_data/will_play_text.csv")
.toDF("line_id", "play_name", "speech_number", "line_number", "speaker", "text_entry")

// --- Filter + Project ---
spark.sparkContext.setJobDescription("Filter and Select")
val filtered = shakespeareDF
.filter($"speaker".isNotNull && $"line_id" > 100)
.select($"play_name", $"speaker", $"text_entry", $"speech_number")
filtered.show(10, truncate = false)

// --- Aggregation (GroupBy + Count/Sum) ---
spark.sparkContext.setJobDescription("GroupBy Aggregation")
val linesPerSpeaker = shakespeareDF
.filter($"speaker".isNotNull)
.groupBy("play_name", "speaker")
.agg(
count("*").alias("line_count"),
sum("speech_number").alias("total_speech_numbers"),
avg("speech_number").alias("avg_speech_number")
)
linesPerSpeaker.show(20, truncate = false)

// --- Sort ---
spark.sparkContext.setJobDescription("Sort by line count")
val sortedSpeakers = linesPerSpeaker
.orderBy(col("line_count").desc)
sortedSpeakers.show(20, truncate = false)

// --- Broadcast Hash Join ---
spark.sparkContext.setJobDescription("Broadcast Hash Join")
val topSpeakers = linesPerSpeaker
.filter($"line_count" > 50)
.select($"speaker".alias("top_speaker"), $"play_name".alias("top_play"))
val broadcastJoined = shakespeareDF
.join(broadcast(topSpeakers), $"speaker" === $"top_speaker" && $"play_name" === $"top_play")
println(s"Broadcast join result count: ${broadcastJoined.count()}")

// --- Sort Merge Join (disable broadcast to force SMJ) ---
spark.sparkContext.setJobDescription("Sort Merge Join")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val plays1 = shakespeareDF
.groupBy("play_name")
.agg(count("*").alias("total_lines"))
.repartition(10)
val plays2 = shakespeareDF
.groupBy("play_name")
.agg(countDistinct("speaker").alias("unique_speakers"))
.repartition(10)
val smjResult = plays1.join(plays2, Seq("play_name"))
smjResult.show(20, truncate = false)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)

// --- Window Functions ---
spark.sparkContext.setJobDescription("Window Functions")
val speakerWindow = Window.partitionBy("play_name").orderBy(col("line_count").desc)
val rankedSpeakers = linesPerSpeaker
.withColumn("rank", rank().over(speakerWindow))
.withColumn("dense_rank", dense_rank().over(speakerWindow))
.withColumn("total_in_play", sum("line_count").over(Window.partitionBy("play_name")))
.withColumn("pct", round(col("line_count") / col("total_in_play") * 100, 2))
.filter(col("rank") <= 3)
.orderBy("play_name", "rank")
rankedSpeakers.show(30, truncate = false)

// --- Explode / Generate ---
spark.sparkContext.setJobDescription("Explode words from text")
val words = shakespeareDF
.filter($"text_entry".isNotNull)
.select($"play_name", $"speaker", explode(split($"text_entry", "\\s+")).alias("word"))
.filter(length($"word") > 0)
val wordCounts = words
.groupBy("word")
.agg(count("*").alias("word_count"))
.orderBy(col("word_count").desc)
wordCounts.show(20, truncate = false)

// --- Union + distinct ---
spark.sparkContext.setJobDescription("Union and Distinct")
val hamlet = shakespeareDF.filter($"play_name" === "Hamlet").select("speaker")
val macbeth = shakespeareDF.filter($"play_name" === "macbeth").select("speaker")
val allSpeakers = hamlet.union(macbeth).distinct()
println(s"Distinct speakers in Hamlet + Macbeth: ${allSpeakers.count()}")

println("GlutenVeloxExample completed. Spark UI available at http://localhost:10000")
println("Press Ctrl+C to stop.")
Thread.sleep(Long.MaxValue)
}
4 changes: 3 additions & 1 deletion spark-ui/src/components/SqlFlow/SqlLayoutService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,9 @@ class SqlLayoutService {
const splitExchangeNodeIds = new Set<string>();
for (const nodeId of nodesIds) {
const node = nodeMap.get(nodeId);
if (node?.nodeName === "Exchange") {
if (node?.nodeName === "Exchange" || node?.nodeName === "ColumnarExchange" ||
node?.nodeName === "CometExchange" || node?.nodeName === "CometColumnarExchange" ||
node?.nodeName === "GpuColumnarExchange") {
splitExchangeNodeIds.add(nodeId.toString());
}
}
Expand Down
Loading
Loading