Investigating and benchmarking how partitioning of data on secondary storage layer will affect Spark performance
Configure and update the following variable in your systems bashrc or bash_profile
or zprofile or profile depending on which operating system you are using.
SPARK_DEFAULT_PAR is used to set a value to spark.default.parallelism configuration
while running the spark applications through the provided shell scripts.
# JAVA configuration
export JAVA_HOM="/usr/lib/jvm/java-8-openjdk-amd64"
# Hadoop configuration
export HADOOP_HOME="/home/ubuntu/hadoop"
PATH=${PATH}:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin
# Apache Spark configuration
export SPARK_HOME="/home/ubuntu/spark"
PATH=${PATH}:${SPARK_HOME}/bin:${SPARK_HOME}/sbin
# Apache Hive configurations
export HIVE_HOME="/home/ubuntu/hive"
PATH=${PATH}:${HIVE_HOME}/bin
export HIVE_CONF_DIR="${HIVE_HOME}/conf"
# Installations
export HADOOP_MASTER="${MASTER_NODE_IP}:9000"
export SPARK_MASTER="${MASTER_NODE_IP}:7077"
export SPARK_DEFAULT_PAR="16"
-
Build using the command -
mvn clean package. It will create a tar.gz file namedSpark-Partitioning-0.1-SNAPSHOT.tar.gzwith the 4 directoriesbin,etc,pythonandlib. Copy thetar.gzfile to the cluster and decompress the folder. -
To create random matrices and load them to HDFS, execute the shell script
nohup ./bin/load_data.sh ${ROW_LEFT} {COL_LEFT} ${ROW_RIGHT} ${COL_RIGHT} ${WORK_FLOW = (RDD, SQL)} ${BASE_PATH} ${NUM_OF_PARTS} > logs/load_data.log &. -
To execute a particular experiment, execute the shell script
nohup ./bin/run_experiment.sh ${WORK_FLOW = (RDD, SQL, BUCKET, HIVE, INDEXED)} ${BASE_PATH} ${EXPERIMENT} ${NUM_OF_PARTITIONS} > logs/job_${EXPERIMENT}_${PARTITIONS}.log &. Allowed values for${EXPERIMENT}aree1,e2ore3. NOTE: WithBUCKETtype workloade3experiment is not valid.
- To execute naive implementation of
PageRankon Spark, execute commandnohup ./bin/run_page_rank.sh ${NUM_OF_PAGES} ${MAX_LINKS} ${RAW_DATA_OP_NM} ${BASE_PATH} ${NUM_OF_ITERATIONS} > logs/rank.log &.
-
Load raw TPC-H data, generated from
dbgento a location${BASE_PATH}/raw_data/. -
Convert the raw files into parquet data by running the command
nohup ./bin/convert_tpch.sh ${BASE_PATH} > logs/tpch_data.log &. -
To execute TPC-H query use command
nohup ./bin/run_tpch_query.sh ${BASE_PATH} ${QUERY_NUM} ${PARTITION_TYPE} {NUM_OF_PARTS} > logs/query_${QUERY_NUM}_${PARTITION_TYPE}.log &. If you want to run all the queries use${QUERY_NUM}=all, and for custom query defined inCustomclass use${QUERY_NUM}=custom. Allowed values for${PARTITION_TYPE}arehyperspace,partsandbuckets. You are required to clear the hyperspace indexes, if there are any with same names previously created.{NUM_OF_PARTS}is used to set the value for the spark configurationspark.sql.shuffle.partitions.
- NOTE: Currently the option to execute with
buckets(hive) is commented as the current testing scenarios create large number of buckets which is undesirable and performs worse.
- Partitioning and bucketing keys are configurable by editing the
tpch.conffile available in theetcdirectory within the package or insrc/resources/configurationswithin the source code. Spark SQLDataFrameshave case sensitive column names, all the column names are inuppercase.
-
To execute super graph module use command -
nohup ./Spark-Partitioning-0.1-SNAPSHOT/bin/super_graph.sh ${GRAPH_COUNT} ${BATCH_SIZE} > logs/super_graph.log & -
To execute the graph matching module use command -
nohup ./Spark-Partitioning-0.1-SNAPSHOT/bin/graph_matching.sh ${TOTAL_NUM_TREES} ${AVG_NUM_NODES} ${LOW_BOUND_NODES} ${NUM_OF_THREADS} ${BATCH_SIZE} > logs/qraph_matching.log &
Code style notes
- Python indentation and tabs = 4 spaces. (We are using Python 3)
- Bash script indentation and tabs = 2 spaces.
- Set up the Scalafmt plugin and use the
.scalafmt.conffor auto formatting scala code.