Sunday, June 25, 2017

Profiling Spark Applications: The Easy Way


Recently, I thought about some one-click way to profile Spark applications, so it could be easily integrated in any work environment without the need to configure the system.


The modern way to report profile statistics about an application (any application, not just Spark or Java application) is generating a single .SVG file called "flame graph". Since this is a regular vector graphic format, the file can be opened in any browser. Moreover, you can navigate between different stack frames by clicking on them, and even search for a symbol name by clicking on "Search" link.


This is how sample flame graph looks like:
The y-axis shows the stack depth while the x-axis shows time spent in a stack frame.

In order to generate flame graphs, there are two mandatory processes usually:
  • Capture stack traces from a running process, and dump them to disk. 
  • Parse these stack traces, and generate .SVG file. 
For Java based applications it stack traces can be gathered using commercial features of Oracle JDK (using -XX:+FlightRecorder option). There's an article that explains how to profile Spark applications using this option.

In OpenJDK this feature is not available, but luckily there are other options. Once of them is using statsd JVM profiler library from Etsy. This library integrates as agent into JVM, gathers statistics like CPU or memory usage, and send it to statsd server in real time. Apparently, this library supports reporting to InfluxDB as well .

Keeping the above in mind, the whole process will look like this:
  1. Have InfluxDB running on random port.
  2. Start Spark with the statsd profiler Jar in its classpath and with the configuration that tells it to report statistics back to the InfluxDB instance.
  3. After running Spark application, query all the reported metrics from the InfluxDB instance.
  4. Run a script that generates the target report.
  5. Stop the InfluxDB instance.
  6. Store generated .SVG file somewhere, or send it to someone.
The following script is a wrapper to ‘spark-submit’ command, which does all that:
#!/bin/bash

set -e
trap 'kill $(jobs -p) 2>/dev/null' EXIT

function find_unused_port() {
  for port in $(seq $1 65000); do
    echo -ne "\035" | telnet 127.0.0.1 $port >/dev/null 2>&1;
    if [ $? -eq 1 ]; then
      echo $port
      exit
    fi
  done
  echo "ERROR: Can't find unused port in range $1-65000"
  exit 1
}

function install_deps() {
  for cmd in python2.7 perl pip; do
    if ! which $cmd >/dev/null 2>&1; then
      echo "ERROR: $cmd is not installed!"
      exit 1
    fi
  done

  echo -e "[$(date +%FT%T)] Installing dependencies"
  [ ! -d $install_dir ] && mkdir $install_dir
  pushd $install_dir >/dev/null
  pip -q install --user influxdb blist

  wget -qc https://github.com/etsy/statsd-jvm-profiler/releases/download/2.1.0/statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar
  ln -sf statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar statsd-jvm-profiler.jar

  wget -qc https://raw.githubusercontent.com/aviemzur/statsd-jvm-profiler/master/visualization/influxdb_dump.py
  wget -qc https://raw.githubusercontent.com/brendangregg/FlameGraph/master/flamegraph.pl

  wget -qc https://dl.influxdata.com/influxdb/releases/influxdb-1.2.4_linux_amd64.tar.gz
  tar -xzf influxdb-1.2.4_linux_amd64.tar.gz
  ln -sf influxdb-1.2.4-1 influxdb
  popd >/dev/null
}

function run_influxdb() {
  echo -e "[$(date +%FT%T)] Starting InfluxDB"
  cat << EOF >influxdb.conf
reporting-disabled = true
hostname = "${local_ip}"
bind-address = ":${influx_meta_port}"
[meta]
  dir = "$(pwd)/influxdb/meta"
[data]
  dir = "$(pwd)/influxdb/data"
  wal-dir = "$(pwd)/influxdb/wal"
[admin]
  enabled = false
[http]
  bind-address = ":${influx_http_port}"
EOF
  rm -rf influxdb
  $install_dir/influxdb/usr/bin/influxd -config influxdb.conf >influxdb.log 2>&1 &

  wait_secs=5
  while [ $wait_secs -gt 0 ]; do
    if curl -sS -i $influx_uri/ping 2>/dev/null | grep X-Influxdb-Version >/dev/null; then
      break
    fi
    sleep 1
    wait_secs=$(($wait_secs-1))
  done

  if [ $wait_secs -eq 0 ]; then
    echo "ERROR: Couldn't start InfluxDB!"
    exit 1
  fi

  curl -sS -X POST $influx_uri/query --data-urlencode "q=CREATE DATABASE profiler" >/dev/null
  curl -sS -X POST $influx_uri/query --data-urlencode "q=CREATE USER profiler WITH PASSWORD 'profiler' WITH ALL PRIVILEGES" >/dev/null
}

function run_spark_submit() {
  spark_args=()
  jars=$install_dir/statsd-jvm-profiler.jar
  while [[ $# > 0 ]]; do
    case "$1" in
      --jars) jars="$jars,$2"
        shift
        ;;
      *) spark_args+=("$1")
        [[ "$1" == *.jar ]] && flamegraph_title="$1"
        ;;
    esac
    shift
  done

  spark_cmd=(spark-submit)
  spark_cmd+=(--jars)
  spark_cmd+=("$jars")
  spark_cmd+=(--conf)
  spark_cmd+=("spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=${local_ip},port=${influx_http_port},reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark")
  spark_cmd+=(${spark_args[@]})

  echo -e "[$(date +%FT%T)] Executing: ${spark_cmd[@]}"
  "${spark_cmd[@]}"
}

function generate_flamegraph() {
  rm -rf stack_traces
  python2.7 $install_dir/influxdb_dump.py -o $local_ip -r $influx_http_port -u profiler -p profiler -d profiler -t spark -e sparkapp -x stack_traces
  perl $install_dir/flamegraph.pl --title "$flamegraph_title" stack_traces/all_*.txt > flamegraph.svg
  rm -rf stack_traces
  echo -e "[$(date +%FT%T)] Created flamegraph: $(pwd)/flamegraph.svg"
}

local_ip=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
install_dir=$HOME/.spark-flamegraph
influx_meta_port=$(find_unused_port 48080)
influx_http_port=$(find_unused_port $(($influx_meta_port+1)))
influx_uri=http://${local_ip}:${influx_http_port}
flamegraph_title="Spark Application"

install_deps
run_influxdb
run_spark_submit "$@"
generate_flamegraph

The script is also available on this gist.


For the sake of justice, it should be noted that the following utilities must present on your system prior to running the script: perl, python2.7 and pip. Otherwise, the script was used in Amazon EMR environment without any issues. Just use the script instead of usual spark-submit command, and it will profile your application. and create a report:

[hadoop@ip-10-121-4-244 tmp]$ ./spark-submit-flamegraph --name 'etlite' --jars file://$(pwd)/probe-events-1.0.jar etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
[2017-06-05T12:34:05] Installing dependencies
[2017-06-05T12:34:09] Starting InfluxDB
[2017-06-05T12:34:10] Executing: spark-submit --jars /home/hadoop/.spark-flamegraph/statsd-jvm-profiler.jar,file:///tmp/probe-events-1.0.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=10.121.4.244,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --name etlite etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
17/06/05 12:34:11 INFO Main$: Configuration file = 's3://mobility-artifacts/airflow/latest/config/etlite.conf'
17/06/05 12:34:14 INFO S3NativeFileSystem: Opening 's3://mobility-artifacts/airflow/latest/config/etlite.conf' for reading
17/06/05 12:34:15 INFO SparkContext: Running Spark version 2.1.0

... running Spark application ...

17/06/05 12:35:17 INFO SparkContext: Successfully stopped SparkContext
17/06/05 12:35:17 INFO ShutdownHookManager: Shutdown hook called
17/06/05 12:35:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fa12133c-b605-4a73-814a-2dfd4ed6fdde

... generating .svg file ...

[2017-06-05T12:35:25] Created flamegraph: /tmp/flamegraph.svg


Integrating this script into Airflow Spark operator is straightforward, especially if your Spark operator is derived from BashOperator. Just make sure the script is available on all Spark Airflow workers, then do the replacement of spark-submit command depending on whether profile=True is passed as the operator argument.

Post your weird flame graphs in comments! :)