Sunday, June 25, 2017

Profiling Spark Applications: The Easy Way


Recently, I thought about some one-click way to profile Spark applications, so it could be easily integrated in any work environment without the need to configure the system.


The modern way to report profile statistics about an application (any application, not just Spark or Java application) is generating a single .SVG file called "flame graph". Since this is a regular vector graphic format, the file can be opened in any browser. Moreover, you can navigate between different stack frames by clicking on them, and even search for a symbol name by clicking on "Search" link.


This is how sample flame graph looks like:
The y-axis shows the stack depth while the x-axis shows time spent in a stack frame.

In order to generate flame graphs, there are two mandatory processes usually:
  • Capture stack traces from a running process, and dump them to disk. 
  • Parse these stack traces, and generate .SVG file. 
For Java based applications it stack traces can be gathered using commercial features of Oracle JDK (using -XX:+FlightRecorder option). There's an article that explains how to profile Spark applications using this option.

In OpenJDK this feature is not available, but luckily there are other options. Once of them is using statsd JVM profiler library from Etsy. This library integrates as agent into JVM, gathers statistics like CPU or memory usage, and send it to statsd server in real time. Apparently, this library supports reporting to InfluxDB as well .

Keeping the above in mind, the whole process will look like this:
  1. Have InfluxDB running on random port.
  2. Start Spark with the statsd profiler Jar in its classpath and with the configuration that tells it to report statistics back to the InfluxDB instance.
  3. After running Spark application, query all the reported metrics from the InfluxDB instance.
  4. Run a script that generates the target report.
  5. Stop the InfluxDB instance.
  6. Store generated .SVG file somewhere, or send it to someone.
The following script is a wrapper to ‘spark-submit’ command, which does all that:

For the sake of justice, it should be noted that the following utilities must present on your system prior to running the script: perl, python2.7 and pip. Otherwise, the script was used in Amazon EMR environment without any issues. Just use the script instead of usual spark-submit command, and it will profile your application. and create a report:

[hadoop@ip-10-121-4-244 tmp]$ ./spark-submit-flamegraph --name 'etlite' --jars file://$(pwd)/probe-events-1.0.jar etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
[2017-06-05T12:34:05] Installing dependencies
[2017-06-05T12:34:09] Starting InfluxDB
[2017-06-05T12:34:10] Executing: spark-submit --jars /home/hadoop/.spark-flamegraph/statsd-jvm-profiler.jar,file:///tmp/probe-events-1.0.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=10.121.4.244,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --name etlite etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
17/06/05 12:34:11 INFO Main$: Configuration file = 's3://mobility-artifacts/airflow/latest/config/etlite.conf'
17/06/05 12:34:14 INFO S3NativeFileSystem: Opening 's3://mobility-artifacts/airflow/latest/config/etlite.conf' for reading
17/06/05 12:34:15 INFO SparkContext: Running Spark version 2.1.0

... running Spark application ...

17/06/05 12:35:17 INFO SparkContext: Successfully stopped SparkContext
17/06/05 12:35:17 INFO ShutdownHookManager: Shutdown hook called
17/06/05 12:35:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fa12133c-b605-4a73-814a-2dfd4ed6fdde

... generating .svg file ...

[2017-06-05T12:35:25] Created flamegraph: /tmp/flamegraph.svg


Integrating this script into Airflow Spark operator is straightforward, especially if your Spark operator is derived from BashOperator. Just make sure the script is available on all Spark Airflow workers, then do the replacement of spark-submit command depending on whether profile=True is passed as the operator argument.

Post your weird flame graphs in comments! :)

16 comments:

Unknown said...

Hi, Thanks for the post. I've been trying to get a flamegraph for the wordcount example [standalone mode on my laptop] in the spark package. The spark job runs properly but the database doesn't seem to be receiving any data and I get the following error :

running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"
Traceback (most recent call last):
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
dumper.run()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
jvms = self.get_jvms()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
return self.get_tag_values("jvmName")
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
items = self.client.query(query).raw['series'][0]['values']
KeyError: 'series'

What do you thin is the actual problem ? Why is the database not receiving the data ? Thanks in advance.

Unknown said...

Hi, Thanks for the post. I've been trying to get a flamegraph for the wordcount example [standalone mode on my laptop] in the spark package. The spark job runs properly but the database doesn't seem to be receiving any data and I get the following error :

running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"
Traceback (most recent call last):
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
dumper.run()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
jvms = self.get_jvms()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
return self.get_tag_values("jvmName")
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
items = self.client.query(query).raw['series'][0]['values']
KeyError: 'series'

What do you thin is the actual problem ? Why is the database not receiving the data ? Thanks in advance.

Michael said...

Does the job really run? Can you post the full output somewhere? Thanks.

Unknown said...


The job does run.

I just gave an input file with a 11 'hello' words. It counts correctly.The problem is most likely with the influxdb.

I tried this manually as well instead of the automated script [for installing the dependencies] and get the same error.

The details are as follows:

command :

./spark-submit-flamegraph.sh --class org.apache.spark.examples.JavaWordCount --master local[2] /opt/spark/examples/target/scala-2.10/spark-examples-1.6.0-hadoop2.2.0.jar /home/srini/Desktop/input.txt >> result.txt

result.txt :

[2017-09-26T23:38:46] Installing dependencies
[2017-09-26T23:38:51] Starting InfluxDB
[2017-09-26T23:38:53] Executing: /opt/spark/bin/spark-submit --jars /home/srini/.spark-flamegraph/statsd-jvm-profiler.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=192.168.1.142,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --class org.apache.spark.examples.JavaWordCount --master local[2] /opt/spark/examples/target/scala-2.10/spark-examples-1.6.0-hadoop2.2.0.jar /home/srini/Desktop/input.txt
hello: 11
running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"


error on the terminal:

Traceback (most recent call last):
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
dumper.run()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
jvms = self.get_jvms()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
return self.get_tag_values("jvmName")
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
items = self.client.query(query).raw['series'][0]['values']
KeyError: 'series'


Thanks.

Unknown said...

You can find the complete logs here :

https://docs.google.com/document/d/1pcnS3DwJKnCqPsiELdwfd_pPjPmBu32e91s8zBWjLvQ/edit?usp=sharing

There does seem to be a few problems with the context handler. But everything else looks fine.

Unknown said...

Hi,
I couldn't run this script with spark terasort benchmark from https://github.com/ehiggs/spark-terasort

Terminal output:
tmp]# ./spark-submit-flamegraph --name 'spark-terasort' --jars file://$(pwd)/spark-terasort-1.0-SNAPSHOT-jar-with-dependencies.jar
Starting InfluxDB
Executing: spark-submit --jars /root/.spark-flamegraph/statsd-jvm-profiler.jar,file:///tmp/spark-terasort-1.0-SNAPSHOT-jar-with-dependencies.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=localhost,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --name spark-terasort
Error: Must specify a primary resource (JAR or Python or R file)
Run with --help for usage help or --verbose for debug output

Can you suggest what i am missing to run jobs properly?
Thanks in advance.

Michael said...

Hi,

I see now what's the reason. When running locally, there are no Spark executors, therefore Spark driver must be instrumented to load statsd reporter as well. Please check the latest version here: https://github.com/spektom/spark-flamegraph.

Regards,
Michael

Unknown said...

Hi Micheal, Thanks for the change. It worked !. i'm just trying to understand the internals of spark and find it very difficult. It would be very nice of you if you can just address the following doubts :
1. Explain what these terms mean properly : master,driver,slave,worker,executor ?
2. I'm not able to exactly understand why you said that there are no executors in standalone mode ? I thought that the executor, master , worker run in the same process.
3. What exactly is the difference between standalone mode and local mode ?

Thanks a lot!

Michael said...

Hi Srinivas,

I think all these terms are better explained in this article:
https://spark.apache.org/docs/latest/cluster-overview.html

When running in local (non-cluster) mode there's only one JVM process that serves both driver and executors. Therefore, all needed JVM settings must be passed to the spark.driver.extraJavaOptions as well.

Hope this helps.

Michael

Narayanan K said...

Hi Michael

I ran the spark-submit-flamegraph on an EMR cluster on AWS with 13 nodes. The Spark job completed successfully, but it fails with below error :

running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"
Traceback (most recent call last):
File "/home/hadoop/.spark-flamegraph/influxdb_dump.py", line 242, in
dumper.run()
File "/home/hadoop/.spark-flamegraph/influxdb_dump.py", line 102, in run
jvms = self.get_jvms()
File "/home/hadoop/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
return self.get_tag_values("jvmName")
File "/home/hadoop/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
items = self.client.query(query).raw['series'][0]['values']
KeyError: 'series'

Could you tell me how to debug this error from influx db ?

Matt said...

Thanks! I made some minor changes to the script to get it to run on Mac OS (10.13.2):

$ diff spark-submit-flamegraph.sh spark-submit-flamegraph-osx.sh
8c8,9
< echo -ne "\035" | telnet 127.0.0.1 $port >/dev/null 2>&1;
---
> nc -nz 127.0.0.1 $port >/dev/null 2>&1;
> #echo -ne "\035" | telnet 127.0.0.1 $port >/dev/null 2>&1;
29c30
< pip -q install --user influxdb blist
---
> pip -q install --user influxdb blist pytz
37,40c38,43
< wget -qc https://dl.influxdata.com/influxdb/releases/influxdb-1.2.4_linux_amd64.tar.gz
< tar -xzf influxdb-1.2.4_linux_amd64.tar.gz
< rm -f influxdb
< ln -s influxdb-1.2.4-1 influxdb
---
> brew list influxdb &> /dev/null || brew install influxdb
> #wget -qc https://dl.influxdata.com/influxdb/releases/influxdb-1.2.4_linux_amd64.tar.gz
> #tar -xzf influxdb-1.2.4_linux_amd64.tar.gz
> #rm -f influxdb
> #ln -s influxdb-1.2.4-1 influxdb
>
61c64,65
< $install_dir/influxdb/usr/bin/influxd -config influxdb.conf >influxdb.log 2>&1 &
---
> influxd -config influxdb.conf >influxdb.log 2>&1 &
> #$install_dir/influxdb/usr/bin/influxd -config influxdb.conf >influxdb.log 2>&1 &
140c144,145
< local_ip=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
---
> local_ip=$(awk '/inet / && $2 != "127.0.0.1"{print $2}' <(ifconfig))
> #local_ip=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')

Michael said...

Thanks!
Don't hesitate to send a pull request to the Github repository:
https://github.com/spektom/spark-flamegraph

Prasun said...

Hi,

Thanks for this wonderful article, I have 7 node Ec2 cluster, do I need to install all the pre-requisite in all the 7 nodes or installing the pre-requisites in spark driver node is sufficient?

Please help.

Regards
Prasun

Michael said...

@Prasun, no installing dependencies on a Spark driver machine should be sufficient.

Prasun said...

In EMR with multinode cluster, the script is working out-of-the-box. However, in EC2 cluster with multinode, container is not getting launched while using the script. Spark application is failing with error (spark application is working fine without the script):
1) Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1520567695003_0014_02_000002
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:601)

2) Uncaught exception in thread Yarn application state monitor
java.io.IOException: Connection reset by peer

Michael said...

Are you using the latest version from GitHub repository?