Sunday, June 25, 2017

Profiling Spark Applications: The Easy Way


Recently, I thought about some one-click way to profile Spark applications, so it could be easily integrated in any work environment without the need to configure the system.


The modern way to report profile statistics about an application (any application, not just Spark or Java application) is generating a single .SVG file called "flame graph". Since this is a regular vector graphic format, the file can be opened in any browser. Moreover, you can navigate between different stack frames by clicking on them, and even search for a symbol name by clicking on "Search" link.


This is how sample flame graph looks like:
The y-axis shows the stack depth while the x-axis shows time spent in a stack frame.

In order to generate flame graphs, there are two mandatory processes usually:
  • Capture stack traces from a running process, and dump them to disk. 
  • Parse these stack traces, and generate .SVG file. 
For Java based applications it stack traces can be gathered using commercial features of Oracle JDK (using -XX:+FlightRecorder option). There's an article that explains how to profile Spark applications using this option.

In OpenJDK this feature is not available, but luckily there are other options. Once of them is using statsd JVM profiler library from Etsy. This library integrates as agent into JVM, gathers statistics like CPU or memory usage, and send it to statsd server in real time. Apparently, this library supports reporting to InfluxDB as well .

Keeping the above in mind, the whole process will look like this:
  1. Have InfluxDB running on random port.
  2. Start Spark with the statsd profiler Jar in its classpath and with the configuration that tells it to report statistics back to the InfluxDB instance.
  3. After running Spark application, query all the reported metrics from the InfluxDB instance.
  4. Run a script that generates the target report.
  5. Stop the InfluxDB instance.
  6. Store generated .SVG file somewhere, or send it to someone.
The following script is a wrapper to ‘spark-submit’ command, which does all that:

For the sake of justice, it should be noted that the following utilities must present on your system prior to running the script: perl, python2.7 and pip. Otherwise, the script was used in Amazon EMR environment without any issues. Just use the script instead of usual spark-submit command, and it will profile your application. and create a report:

[hadoop@ip-10-121-4-244 tmp]$ ./spark-submit-flamegraph --name 'etlite' --jars file://$(pwd)/probe-events-1.0.jar etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
[2017-06-05T12:34:05] Installing dependencies
[2017-06-05T12:34:09] Starting InfluxDB
[2017-06-05T12:34:10] Executing: spark-submit --jars /home/hadoop/.spark-flamegraph/statsd-jvm-profiler.jar,file:///tmp/probe-events-1.0.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=10.121.4.244,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --name etlite etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
17/06/05 12:34:11 INFO Main$: Configuration file = 's3://mobility-artifacts/airflow/latest/config/etlite.conf'
17/06/05 12:34:14 INFO S3NativeFileSystem: Opening 's3://mobility-artifacts/airflow/latest/config/etlite.conf' for reading
17/06/05 12:34:15 INFO SparkContext: Running Spark version 2.1.0

... running Spark application ...

17/06/05 12:35:17 INFO SparkContext: Successfully stopped SparkContext
17/06/05 12:35:17 INFO ShutdownHookManager: Shutdown hook called
17/06/05 12:35:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fa12133c-b605-4a73-814a-2dfd4ed6fdde

... generating .svg file ...

[2017-06-05T12:35:25] Created flamegraph: /tmp/flamegraph.svg


Integrating this script into Airflow Spark operator is straightforward, especially if your Spark operator is derived from BashOperator. Just make sure the script is available on all Spark Airflow workers, then do the replacement of spark-submit command depending on whether profile=True is passed as the operator argument.

Post your weird flame graphs in comments! :)

9 comments:

Srinivas Kumar said...

Hi, Thanks for the post. I've been trying to get a flamegraph for the wordcount example [standalone mode on my laptop] in the spark package. The spark job runs properly but the database doesn't seem to be receiving any data and I get the following error :

running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"
Traceback (most recent call last):
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
dumper.run()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
jvms = self.get_jvms()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
return self.get_tag_values("jvmName")
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
items = self.client.query(query).raw['series'][0]['values']
KeyError: 'series'

What do you thin is the actual problem ? Why is the database not receiving the data ? Thanks in advance.

Srinivas Kumar said...

Hi, Thanks for the post. I've been trying to get a flamegraph for the wordcount example [standalone mode on my laptop] in the spark package. The spark job runs properly but the database doesn't seem to be receiving any data and I get the following error :

running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"
Traceback (most recent call last):
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
dumper.run()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
jvms = self.get_jvms()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
return self.get_tag_values("jvmName")
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
items = self.client.query(query).raw['series'][0]['values']
KeyError: 'series'

What do you thin is the actual problem ? Why is the database not receiving the data ? Thanks in advance.

Michael said...

Does the job really run? Can you post the full output somewhere? Thanks.

Srinivas Kumar said...


The job does run.

I just gave an input file with a 11 'hello' words. It counts correctly.The problem is most likely with the influxdb.

I tried this manually as well instead of the automated script [for installing the dependencies] and get the same error.

The details are as follows:

command :

./spark-submit-flamegraph.sh --class org.apache.spark.examples.JavaWordCount --master local[2] /opt/spark/examples/target/scala-2.10/spark-examples-1.6.0-hadoop2.2.0.jar /home/srini/Desktop/input.txt >> result.txt

result.txt :

[2017-09-26T23:38:46] Installing dependencies
[2017-09-26T23:38:51] Starting InfluxDB
[2017-09-26T23:38:53] Executing: /opt/spark/bin/spark-submit --jars /home/srini/.spark-flamegraph/statsd-jvm-profiler.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=192.168.1.142,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --class org.apache.spark.examples.JavaWordCount --master local[2] /opt/spark/examples/target/scala-2.10/spark-examples-1.6.0-hadoop2.2.0.jar /home/srini/Desktop/input.txt
hello: 11
running query: SHOW TAG VALUES FROM "heap.total.max" WITH KEY = "jvmName"


error on the terminal:

Traceback (most recent call last):
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 242, in
dumper.run()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 102, in run
jvms = self.get_jvms()
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 48, in get_jvms
return self.get_tag_values("jvmName")
File "/home/srini/.spark-flamegraph/influxdb_dump.py", line 42, in get_tag_values
items = self.client.query(query).raw['series'][0]['values']
KeyError: 'series'


Thanks.

Srinivas Kumar said...

You can find the complete logs here :

https://docs.google.com/document/d/1pcnS3DwJKnCqPsiELdwfd_pPjPmBu32e91s8zBWjLvQ/edit?usp=sharing

There does seem to be a few problems with the context handler. But everything else looks fine.

karthik kumar said...

Hi,
I couldn't run this script with spark terasort benchmark from https://github.com/ehiggs/spark-terasort

Terminal output:
tmp]# ./spark-submit-flamegraph --name 'spark-terasort' --jars file://$(pwd)/spark-terasort-1.0-SNAPSHOT-jar-with-dependencies.jar
Starting InfluxDB
Executing: spark-submit --jars /root/.spark-flamegraph/statsd-jvm-profiler.jar,file:///tmp/spark-terasort-1.0-SNAPSHOT-jar-with-dependencies.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=localhost,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --name spark-terasort
Error: Must specify a primary resource (JAR or Python or R file)
Run with --help for usage help or --verbose for debug output

Can you suggest what i am missing to run jobs properly?
Thanks in advance.

Michael said...

Hi,

I see now what's the reason. When running locally, there are no Spark executors, therefore Spark driver must be instrumented to load statsd reporter as well. Please check the latest version here: https://github.com/spektom/spark-flamegraph.

Regards,
Michael

Srinivas Kumar said...

Hi Micheal, Thanks for the change. It worked !. i'm just trying to understand the internals of spark and find it very difficult. It would be very nice of you if you can just address the following doubts :
1. Explain what these terms mean properly : master,driver,slave,worker,executor ?
2. I'm not able to exactly understand why you said that there are no executors in standalone mode ? I thought that the executor, master , worker run in the same process.
3. What exactly is the difference between standalone mode and local mode ?

Thanks a lot!

Michael said...

Hi Srinivas,

I think all these terms are better explained in this article:
https://spark.apache.org/docs/latest/cluster-overview.html

When running in local (non-cluster) mode there's only one JVM process that serves both driver and executors. Therefore, all needed JVM settings must be passed to the spark.driver.extraJavaOptions as well.

Hope this helps.

Michael