Apache Spark™ is a fast and general engine for large-scale data processing. This document gives a quick introduction how to get a first test program in Spark running on Piz Daint.
Prerequisite
Spark uses SSH to deploy the master and the workers. For this mechanism to work the user need to ensure that he has generated a public key, i.e. the following file exists:
ls $HOME/.ssh/id_rsa.pub
The public key can be generated by using the following command:
ssh-keygen -t rsa
It needs to be made available to the workers with
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
Setup and Running on Piz Daint
Note that running Apache Spark on Piz Daint is different (i) from running Spark on a standard cluster, and (ii) from running MPI based applications on Piz Daint. In order to use Spark, three steps have to be kept in mind when submitting a job to the queing system:
- an entire Spark cluster has to be started on the allocated nodes
- when the Spark cluster is up and running, Spark jobs have to be submitted to the cluster
- after all Spark jobs have finished running, the cluster has to be shut down
The following script exemplifies how to implement these three steps. The script asks for 5 nodes, making 5*36 CPUs available to the Spark cluster. Since Spark has a master slave architecture, this means we will have one master node and four worker nodes. Further, the job is constraint to the MC nodes of Piz Daint and its running time is 15 minutes.
Step one is to start the cluster, which is implemented by executing the start-all.sh
script. Step two is submitting Spark jobs to the Spark cluster, which is implemented by running the spark-submit
command. To shut the Spark cluster down, the script stop-all.sh
has to be executed.
Fine tuning of Spark's configuration can be done by setting parameters in the environment variable $SPARK_CONF
.
#!/bin/bash #SBATCH --job-name="spark" #SBATCH --time=00:15:00 #SBATCH --nodes=5 #SBATCH --constraint=mc #SBATCH --output=sparkjob.%j.log #SBATCH --account=<project> # set some variables for Spark export SPARK_WORKER_CORES=36 export SPARK_LOCAL_DIRS="/tmp" # load modules module load slurm module load daint-mc module load Spark # deploy of spark start-all.sh # some extra Spark configuration SPARK_CONF=" --conf spark.default.parallelism=10 --conf spark.executor.cores=8 --conf spark.executor.memory=15g --conf spark.rdd.compress=false --conf spark.shuffle.compress=false --conf spark.broadcast.compress=false --conf spark.hadoop.dfs.replication=1 --conf spark.driver.memory=60G" # submit a Spark job spark-submit ${SPARK_CONF} --master $SPARKURL <my_script> # clean out Spark deployment stop-all.sh
Please replace the string <project>
with the ID of the active project that will be charged for the allocation.
Also note that in order to use Spark on Piz Daint you have to load the corresponding module:
module load daint-mc module load Spark
or
module load daint-mc module load Spark/<module-version>
in order to specify a version. The command module avail Spark
can be used to have an overview of the available versions.
Testing Apache Spark
Apache Spark comes with a set of examples that can be used to verify that the system is running correctly. The following script creates a 2 nodes Spark cluster (one master, one worker) and submits the Pi approximation example.
#!/bin/bash #SBATCH --job-name="spark" #SBATCH --time=00:15:00 #SBATCH --nodes=2 #SBATCH --constraint=mc #SBATCH --output=sparkjob.%j.log #SBATCH --account=<project> # set some variables for Spark export SPARK_WORKER_CORES=36 export SPARK_LOCAL_DIRS="/tmp" # load modules module load slurm module load daint-mc module load Spark # deploy of spark start-all.sh # some extra Spark configuration SPARK_CONF=" --conf spark.default.parallelism=10 --conf spark.executor.cores=8 --conf spark.executor.memory=15g " # submit a Spark job spark-submit ${SPARK_CONF} --master $SPARKURL \ --class org.apache.spark.examples.SparkPi \ $EBROOTSPARK/examples/jars/spark-examples_2.11-2.4.7.jar 10000; # clean out Spark deployment stop-all.sh
The output is stored in a log file with the name sparkjob.<jobid>.log
. By default Spark is quite verbose, so the desired output can be found at the end of the file:
... 17/04/05 16:41:56 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/04/05 16:41:56 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:36, took 5.976415 s Pi is roughly 3.14163436 17/04/05 16:41:56 INFO SparkUI: Stopped Spark web UI at http://148.187.46.54:4040 17/04/05 16:41:56 INFO SparkDeploySchedulerBackend: Shutting down all executors ...