Sunday, June 25, 2017

Profiling Spark Applications: The Easy Way


Recently, I thought about some one-click way to profile Spark applications, so it could be easily integrated in any work environment without the need to configure the system.


The modern way to report profile statistics about an application (any application, not just Spark or Java application) is generating a single .SVG file called "flame graph". Since this is a regular vector graphic format, the file can be opened in any browser. Moreover, you can navigate between different stack frames by clicking on them, and even search for a symbol name by clicking on "Search" link.


This is how sample flame graph looks like:
The y-axis shows the stack depth while the x-axis shows time spent in a stack frame.

In order to generate flame graphs, there are two mandatory processes usually:
  • Capture stack traces from a running process, and dump them to disk. 
  • Parse these stack traces, and generate .SVG file. 
For Java based applications it stack traces can be gathered using commercial features of Oracle JDK (using -XX:+FlightRecorder option). There's an article that explains how to profile Spark applications using this option.

In OpenJDK this feature is not available, but luckily there are other options. Once of them is using statsd JVM profiler library from Etsy. This library integrates as agent into JVM, gathers statistics like CPU or memory usage, and send it to statsd server in real time. Apparently, this library supports reporting to InfluxDB as well .

Keeping the above in mind, the whole process will look like this:
  1. Have InfluxDB running on random port.
  2. Start Spark with the statsd profiler Jar in its classpath and with the configuration that tells it to report statistics back to the InfluxDB instance.
  3. After running Spark application, query all the reported metrics from the InfluxDB instance.
  4. Run a script that generates the target report.
  5. Stop the InfluxDB instance.
  6. Store generated .SVG file somewhere, or send it to someone.
The following script is a wrapper to ‘spark-submit’ command, which does all that:

For the sake of justice, it should be noted that the following utilities must present on your system prior to running the script: perl, python2.7 and pip. Otherwise, the script was used in Amazon EMR environment without any issues. Just use the script instead of usual spark-submit command, and it will profile your application. and create a report:

[hadoop@ip-10-121-4-244 tmp]$ ./spark-submit-flamegraph --name 'etlite' --jars file://$(pwd)/probe-events-1.0.jar etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
[2017-06-05T12:34:05] Installing dependencies
[2017-06-05T12:34:09] Starting InfluxDB
[2017-06-05T12:34:10] Executing: spark-submit --jars /home/hadoop/.spark-flamegraph/statsd-jvm-profiler.jar,file:///tmp/probe-events-1.0.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=10.121.4.244,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --name etlite etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
17/06/05 12:34:11 INFO Main$: Configuration file = 's3://mobility-artifacts/airflow/latest/config/etlite.conf'
17/06/05 12:34:14 INFO S3NativeFileSystem: Opening 's3://mobility-artifacts/airflow/latest/config/etlite.conf' for reading
17/06/05 12:34:15 INFO SparkContext: Running Spark version 2.1.0

... running Spark application ...

17/06/05 12:35:17 INFO SparkContext: Successfully stopped SparkContext
17/06/05 12:35:17 INFO ShutdownHookManager: Shutdown hook called
17/06/05 12:35:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fa12133c-b605-4a73-814a-2dfd4ed6fdde

... generating .svg file ...

[2017-06-05T12:35:25] Created flamegraph: /tmp/flamegraph.svg


Integrating this script into Airflow Spark operator is straightforward, especially if your Spark operator is derived from BashOperator. Just make sure the script is available on all Spark Airflow workers, then do the replacement of spark-submit command depending on whether profile=True is passed as the operator argument.

Post your weird flame graphs in comments! :)

16 comments:

  1. Hi, Thanks for the post. I've been trying to get a flamegraph for the wordcount example [standalone mode on my laptop] in the spark package. The spark job runs properly but the database doesn't seem to be receiving any data and I get the following error :

    running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"
    Traceback (most recent call last):
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
    dumper.run()
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
    jvms = self.get_jvms()
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
    return self.get_tag_values("jvmName")
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
    items = self.client.query(query).raw['series'][0]['values']
    KeyError: 'series'

    What do you thin is the actual problem ? Why is the database not receiving the data ? Thanks in advance.

    ReplyDelete
  2. Hi, Thanks for the post. I've been trying to get a flamegraph for the wordcount example [standalone mode on my laptop] in the spark package. The spark job runs properly but the database doesn't seem to be receiving any data and I get the following error :

    running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"
    Traceback (most recent call last):
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
    dumper.run()
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
    jvms = self.get_jvms()
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
    return self.get_tag_values("jvmName")
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
    items = self.client.query(query).raw['series'][0]['values']
    KeyError: 'series'

    What do you thin is the actual problem ? Why is the database not receiving the data ? Thanks in advance.

    ReplyDelete
  3. Does the job really run? Can you post the full output somewhere? Thanks.

    ReplyDelete

  4. The job does run.

    I just gave an input file with a 11 'hello' words. It counts correctly.The problem is most likely with the influxdb.

    I tried this manually as well instead of the automated script [for installing the dependencies] and get the same error.

    The details are as follows:

    command :

    ./spark-submit-flamegraph.sh --class org.apache.spark.examples.JavaWordCount --master local[2] /opt/spark/examples/target/scala-2.10/spark-examples-1.6.0-hadoop2.2.0.jar /home/srini/Desktop/input.txt >> result.txt

    result.txt :

    [2017-09-26T23:38:46] Installing dependencies
    [2017-09-26T23:38:51] Starting InfluxDB
    [2017-09-26T23:38:53] Executing: /opt/spark/bin/spark-submit --jars /home/srini/.spark-flamegraph/statsd-jvm-profiler.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=192.168.1.142,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --class org.apache.spark.examples.JavaWordCount --master local[2] /opt/spark/examples/target/scala-2.10/spark-examples-1.6.0-hadoop2.2.0.jar /home/srini/Desktop/input.txt
    hello: 11
    running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"


    error on the terminal:

    Traceback (most recent call last):
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
    dumper.run()
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
    jvms = self.get_jvms()
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
    return self.get_tag_values("jvmName")
    File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
    items = self.client.query(query).raw['series'][0]['values']
    KeyError: 'series'


    Thanks.

    ReplyDelete
  5. You can find the complete logs here :

    https://docs.google.com/document/d/1pcnS3DwJKnCqPsiELdwfd_pPjPmBu32e91s8zBWjLvQ/edit?usp=sharing

    There does seem to be a few problems with the context handler. But everything else looks fine.

    ReplyDelete
  6. Hi,
    I couldn't run this script with spark terasort benchmark from https://github.com/ehiggs/spark-terasort

    Terminal output:
    tmp]# ./spark-submit-flamegraph --name 'spark-terasort' --jars file://$(pwd)/spark-terasort-1.0-SNAPSHOT-jar-with-dependencies.jar
    Starting InfluxDB
    Executing: spark-submit --jars /root/.spark-flamegraph/statsd-jvm-profiler.jar,file:///tmp/spark-terasort-1.0-SNAPSHOT-jar-with-dependencies.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=localhost,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --name spark-terasort
    Error: Must specify a primary resource (JAR or Python or R file)
    Run with --help for usage help or --verbose for debug output

    Can you suggest what i am missing to run jobs properly?
    Thanks in advance.

    ReplyDelete
  7. Hi,

    I see now what's the reason. When running locally, there are no Spark executors, therefore Spark driver must be instrumented to load statsd reporter as well. Please check the latest version here: https://github.com/spektom/spark-flamegraph.

    Regards,
    Michael

    ReplyDelete
  8. Hi Micheal, Thanks for the change. It worked !. i'm just trying to understand the internals of spark and find it very difficult. It would be very nice of you if you can just address the following doubts :
    1. Explain what these terms mean properly : master,driver,slave,worker,executor ?
    2. I'm not able to exactly understand why you said that there are no executors in standalone mode ? I thought that the executor, master , worker run in the same process.
    3. What exactly is the difference between standalone mode and local mode ?

    Thanks a lot!

    ReplyDelete
  9. Hi Srinivas,

    I think all these terms are better explained in this article:
    https://spark.apache.org/docs/latest/cluster-overview.html

    When running in local (non-cluster) mode there's only one JVM process that serves both driver and executors. Therefore, all needed JVM settings must be passed to the spark.driver.extraJavaOptions as well.

    Hope this helps.

    Michael

    ReplyDelete
  10. Hi Michael

    I ran the spark-submit-flamegraph on an EMR cluster on AWS with 13 nodes. The Spark job completed successfully, but it fails with below error :

    running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"
    Traceback (most recent call last):
    File "/home/hadoop/.spark-flamegraph/influxdb_dump.py", line 242, in
    dumper.run()
    File "/home/hadoop/.spark-flamegraph/influxdb_dump.py", line 102, in run
    jvms = self.get_jvms()
    File "/home/hadoop/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
    return self.get_tag_values("jvmName")
    File "/home/hadoop/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
    items = self.client.query(query).raw['series'][0]['values']
    KeyError: 'series'

    Could you tell me how to debug this error from influx db ?

    ReplyDelete
  11. Thanks! I made some minor changes to the script to get it to run on Mac OS (10.13.2):

    $ diff spark-submit-flamegraph.sh spark-submit-flamegraph-osx.sh
    8c8,9
    < echo -ne "\035" | telnet 127.0.0.1 $port >/dev/null 2>&1;
    ---
    > nc -nz 127.0.0.1 $port >/dev/null 2>&1;
    > #echo -ne "\035" | telnet 127.0.0.1 $port >/dev/null 2>&1;
    29c30
    < pip -q install --user influxdb blist
    ---
    > pip -q install --user influxdb blist pytz
    37,40c38,43
    < wget -qc https://dl.influxdata.com/influxdb/releases/influxdb-1.2.4_linux_amd64.tar.gz
    < tar -xzf influxdb-1.2.4_linux_amd64.tar.gz
    < rm -f influxdb
    < ln -s influxdb-1.2.4-1 influxdb
    ---
    > brew list influxdb &> /dev/null || brew install influxdb
    > #wget -qc https://dl.influxdata.com/influxdb/releases/influxdb-1.2.4_linux_amd64.tar.gz
    > #tar -xzf influxdb-1.2.4_linux_amd64.tar.gz
    > #rm -f influxdb
    > #ln -s influxdb-1.2.4-1 influxdb
    >
    61c64,65
    < $install_dir/influxdb/usr/bin/influxd -config influxdb.conf >influxdb.log 2>&1 &
    ---
    > influxd -config influxdb.conf >influxdb.log 2>&1 &
    > #$install_dir/influxdb/usr/bin/influxd -config influxdb.conf >influxdb.log 2>&1 &
    140c144,145
    < local_ip=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
    ---
    > local_ip=$(awk '/inet / && $2 != "127.0.0.1"{print $2}' <(ifconfig))
    > #local_ip=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')

    ReplyDelete
    Replies
    1. Thanks!
      Don't hesitate to send a pull request to the Github repository:
      https://github.com/spektom/spark-flamegraph

      Delete
  12. Hi,

    Thanks for this wonderful article, I have 7 node Ec2 cluster, do I need to install all the pre-requisite in all the 7 nodes or installing the pre-requisites in spark driver node is sufficient?

    Please help.

    Regards
    Prasun

    ReplyDelete
  13. @Prasun, no installing dependencies on a Spark driver machine should be sufficient.

    ReplyDelete
  14. In EMR with multinode cluster, the script is working out-of-the-box. However, in EC2 cluster with multinode, container is not getting launched while using the script. Spark application is failing with error (spark application is working fine without the script):
    1) Exit status: 1. Diagnostics: Exception from container-launch.
    Container id: container_1520567695003_0014_02_000002
    Exit code: 1
    Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:601)

    2) Uncaught exception in thread Yarn application state monitor
    java.io.IOException: Connection reset by peer

    ReplyDelete
    Replies
    1. Are you using the latest version from GitHub repository?

      Delete