Sunday, June 25, 2017

Profiling Spark Applications: The Easy Way

Recently, I thought about some one-click way to profile Spark applications, so it could be easily integrated in any work environment without the need to configure the system.

The modern way to report profile statistics about an application (any application, not just Spark or Java application) is generating a single .SVG file called "flame graph". Since this is a regular vector graphic format, the file can be opened in any browser. Moreover, you can navigate between different stack frames by clicking on them, and even search for a symbol name by clicking on "Search" link.

This is how sample flame graph looks like:
The y-axis shows the stack depth while the x-axis shows time spent in a stack frame.

In order to generate flame graphs, there are two mandatory processes usually:
  • Capture stack traces from a running process, and dump them to disk. 
  • Parse these stack traces, and generate .SVG file. 
For Java based applications it stack traces can be gathered using commercial features of Oracle JDK (using -XX:+FlightRecorder option). There's an article that explains how to profile Spark applications using this option.

In OpenJDK this feature is not available, but luckily there are other options. Once of them is using statsd JVM profiler library from Etsy. This library integrates as agent into JVM, gathers statistics like CPU or memory usage, and send it to statsd server in real time. Apparently, this library supports reporting to InfluxDB as well .

Keeping the above in mind, the whole process will look like this:
  1. Have InfluxDB running on random port.
  2. Start Spark with the statsd profiler Jar in its classpath and with the configuration that tells it to report statistics back to the InfluxDB instance.
  3. After running Spark application, query all the reported metrics from the InfluxDB instance.
  4. Run a script that generates the target report.
  5. Stop the InfluxDB instance.
  6. Store generated .SVG file somewhere, or send it to someone.
The following script is a wrapper to ‘spark-submit’ command, which does all that:

For the sake of justice, it should be noted that the following utilities must present on your system prior to running the script: perl, python2.7 and pip. Otherwise, the script was used in Amazon EMR environment without any issues. Just use the script instead of usual spark-submit command, and it will profile your application. and create a report:

[hadoop@ip-10-121-4-244 tmp]$ ./spark-submit-flamegraph --name 'etlite' --jars file://$(pwd)/probe-events-1.0.jar etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
[2017-06-05T12:34:05] Installing dependencies
[2017-06-05T12:34:09] Starting InfluxDB
[2017-06-05T12:34:10] Executing: spark-submit --jars /home/hadoop/.spark-flamegraph/statsd-jvm-profiler.jar,file:///tmp/probe-events-1.0.jar --conf spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler.jar=server=,port=48081,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=sparkapp,tagMapping=spark --name etlite etlite_2.11-0.1.0.jar s3://mobility-artifacts/airflow/latest/config/etlite.conf
17/06/05 12:34:11 INFO Main$: Configuration file = 's3://mobility-artifacts/airflow/latest/config/etlite.conf'
17/06/05 12:34:14 INFO S3NativeFileSystem: Opening 's3://mobility-artifacts/airflow/latest/config/etlite.conf' for reading
17/06/05 12:34:15 INFO SparkContext: Running Spark version 2.1.0

... running Spark application ...

17/06/05 12:35:17 INFO SparkContext: Successfully stopped SparkContext
17/06/05 12:35:17 INFO ShutdownHookManager: Shutdown hook called
17/06/05 12:35:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fa12133c-b605-4a73-814a-2dfd4ed6fdde

... generating .svg file ...

[2017-06-05T12:35:25] Created flamegraph: /tmp/flamegraph.svg

Integrating this script into Airflow Spark operator is straightforward, especially if your Spark operator is derived from BashOperator. Just make sure the script is available on all Spark Airflow workers, then do the replacement of spark-submit command depending on whether profile=True is passed as the operator argument.

Post your weird flame graphs in comments! :)

Tuesday, August 09, 2016

Why we moved from Mesos to Yarn (and from Chronos to Airflow)

We run daily Spark jobs that generate various reports: retention, cohorts, user activity, life time value, etc., and recently we've found ourselves in a situation that one person must wear operator's hat for watching this process, and re-starting tasks once they are failed or (which is much worse) are stuck. 
Why this happened? There are many reasons for that. If you follow our tech blog you probably know that we run scheduled Spark jobs in Mesos on data stored in S3 either in SequenceFile or in Parquet format (here's good write-up about our process). It's funny that Spark was created as a proof of concept framework for Mesos, but it really seemed that these two guys are not playing together well enough. We tried enabling dynamic allocation, but we ended up running important jobs without it as they failed in 50% of cases because of some weird errors regarding lost shuffle blocks.
The most important issue was that sometimes Spark job would stuck on a last phase, and there was no way to revive it but enter the machine running this stuck executor via SSH, and kill the process. And this was part of the "operator" man's work. We suspected that this was related to Mesos somehow, but there was no proof.
Another problematic part in our pipeline was Chronos. Here are some issues that we encountered on daily basis:
  • Scheduled tasks were not executed. What??? Isn't it a scheduling framework? Right. But, sometimes, jobs depending on some parent task, which has finished successfully didn't run. Not that they didn't run on time - they didn't run at all.
  • The same job was executed twice. This event was rare, but if it happened for some task that wrote to Redshift DB it could lead to really bad user experience to those trying to query at the same time.
  • There is an option to specify multiple parent tasks, but this option simply didn't work.
And there were tens of other less important, but still annoying issues. Frankly, it looks like Chronos is not in intensive development anymore, or it has reached its level of perfection, but we're not aware of this fact :-)
So, to kill two birds with one stone, we've decided to move to Yarn (as a replacement for Mesos) and to Airflow (as a replacement for Chronos) at once. Configuring Yarn is not a simplest task, especially if your target is having a cluster that runs both Spark and Hadoop jobs on it (Hadoop is still required for indexing data into Druid). But after reading numerous articles, and looking how Amazon EMR is configured we've reached working configuration of Yarn cluster with HA, which allows Spark/Hadoop to utilize resources fully using dynamic allocation. Needless to say that no jobs are stuck anymore.
As for Airflow, the most important advantages over Chronos for us are:
  • Tasks in Airflow are defined programmatically, so it's easier to generate dynamic workflows when we want to rebuild some data for a given time frame.
  • Airflow written in Python, so it's really easy to hack and adapt it to your needs, or at least understand why something doesn't work as expected.
  • Multiple parents are allowed.
  • It's more stable.
  • It's in active development.
We are still testing this constellation of Yarn and Airflow, but for now it looks like it works much much better.
Enjoy our production workflow screenshot as a complement to this post :)

Thursday, February 05, 2015

On-Premise Deployment with Docker

There was a request to make an on-premise installation package for one of Web services I work on, so I've started to think what would be the correct format of doing that. There's no problem to package everything as .rpm or as .deb, but what should be the target Linux distribution? There are too many variations in them, so it becomes impractical to support even a couple of standard ones. The most crucial differences between distributions for my project were: Python version, dependency packages, and init scripts (sysvinit vs. upstart vs. systemd), not to mention diversity of ways to configure one distribution or another. In short, like a good cool kid (how come none yet made a video about all the cool kids that use Docker?) I started to look at modern technologies.

Docker is a set of high-level tools around Linux containers, which is best explained on their FAQ page. What we need from Docker is the ability to run any Linux distribution in a container without any noticeable performance loss. So, the idea is to provide a package that includes Docker-file inside, which contains all the instructions needed for constructing our own Linux image during deployment. Three scripts are provided for user convenience:, and The first script builds Linux image according to what's written in Dockerfile. Start and stop scripts will start or stop, respectively, the Docker container running our service.

First, let's look at the Dockerfile:

# Take Ubuntu 14.04 as base image:
FROM ubuntu:14.04
MAINTAINER Michael Spector

# Install all the dependencies needed for the service to run:
RUN apt-key adv --keyserver --recv 7F0CEB10
RUN echo "deb dist 10gen" | tee -a /etc/apt/sources.list.d/mongodb.list

RUN apt-get -y update && apt-get install -y \
 python-pkg-resources \
 python-dev \
 python-setuptools \
 build-essential \
 libffi-dev \
 python-dateutil \
 python-lxml \
 python-crypto \
 python-ldap \
 libjpeg8-dev \
 rabbitmq-server \
 mongodb-org \
 supervisor \

# Configure dependencies:
RUN printf "[\n\t{rabbit, [{tcp_listeners, [{\"\", 5672}]}]}\n]." > /etc/rabbitmq/rabbitmq.config

# Configure directory that will be mapped to the host running the container:
RUN mkdir /var/lib/myservice
RUN chown www-data:www-data /var/lib/myservice

# Copy all the needed files to the target image:
ADD myservice /var/www/myservice/
ADD supervisor.conf /etc/supervisor/conf.d/myservice.conf

# Install missing Python dependencies:
RUN pip install -e /var/www/myservice/

# Port of my service Web interface:

# Make sure that all the permissions of mapped volumes are correct prior to running the service.
# This is needed since by default mapped volumes inherit ownership of relevant host directories:
CMD chown -R mongodb:mongodb /var/lib/mongodb \
 chown -R rabbitmq:rabbitmq /var/lib/rabbitmq \
 chown -R www-data:www-data /var/lib/myservice \
 /usr/bin/supervisord -c /etc/supervisor/supervisord.conf

Docker files are pretty much self descriptive, so there's nothing to add except for the comments written inside. The last command executes Supervisor, which is one of recommended ways to execute multiple services in a single Docker container. 

Here's how the install script looks like:


# Sanity checks:
if [ $(id -u) -ne 0 ]; then
 echo " * This script must be run as root!" >&2
 exit 1
if selinuxenabled >/dev/null 2>&1; then
 echo " * SELinux must be disabled!" >&2
 exit 1
if ! which docker >/dev/null 2>&1; then
 echo " * Docker is not installed!" >&2
 exit 1
if ! docker info >/dev/null 2>&1; then
 echo " * Docker is not running!" >&2
 exit 1


cd .install || exit 1

# Build Docker image based on Dockerfile:
docker build -t $image . || exit 1

# Remove any stale images:
docker rmi -f $(docker images --filter "dangling=true" -q) 2>/dev/null

# Remove any old containers:
docker rm -f $container 2>/dev/null

# Create new container that will run our service.
docker create \
 -p 8080:8080 \
 -v /var/lib/myservice/workspace:/var/lib/myservice \
 -v /var/lib/myservice/mongodb:/var/lib/mongodb \
 -v /var/lib/myservice/rabbitmq:/var/lib/rabbitmq/mnesia \
 --name $container $image || exit 1

echo " ============================= "
echo "  MyService is now installed!  "
echo " ============================= "

Two important notes regarding the docker container creation operation:
  • All the directories containing the data to be persisted should be mounted to host directories, otherwise they will be gone once we re-create the container.
  • A forward from host's port to container's port 8080 must be set up, so the service will be accessible from the outside world.
Once install script is invoked, Docker will pull relevant Ubuntu image, and configure it according to our needs.

The start script looks much simpler:


if [ $(id -u) -ne 0 ]; then
 echo " * This script must be run as root!" >&2
 exit 1
if [ ! -d /var/lib/myservice ] || ! docker ps -a | grep myservice_service >/dev/null; then
 echo " * MyService is not installed! Please run ./"
 exit 1

# Start our docker container:
docker start myservice_service >/dev/null || exit 1

echo " ============================================= "
echo "  Started listening on http://localhost:8080/  "
echo " ============================================= "

The stop script is the shortest one:


if [ $(id -u) -ne 0 ]; then
 echo " * This script must be run as root!" >&2
 exit 1

# Stop our docker container:
docker stop myservice_service >/dev/null 2>&1

echo " ====================== "
echo "  MyService is stopped  "
echo " ====================== "

And, finally here's the script that gathers everything into a tarball:

#!/bin/bash -x

rm -rf $TARGET && mkdir -p $TARGET_INSTALL

# Copy the project files:
cp -aL ../project/* $TARGET_INSTALL/ || exit 1
cp Dockerfile supervisor.conf $TARGET_INSTALL/ || exit 1

# Copy installation instruction and scripts:
cp $TARGET/ || exit 1

rm -f $TARGET.tgz && tar -zcf $TARGET.tgz $TARGET/ || exit 1
rm -rf $TARGET/

So, what's the customer experience after he opens the tarball? He sees the following:

root@localhost:~/Downloads/myservice-docker$ ls

Where README file contains very simple instructions, like: to install or upgrade run: ./, to start run: ./, to stop, run: ./ Prerequisites are very simple as well: disable SELinux and install Docker.

Why the this works? Because once you have Docker installed, we can be sure that Dockerfile instructions will succeed, and as a result there will be an image containing exactly what we need. Due to Docker caching abilities all subsequent calls to ./, for instance when upgrading, will be much faster.

Do you see any caveats with the following scheme? Something to improve?

Thanks for your attention!

Monday, March 24, 2014

REST service in 5 minutes (using Java)

I remember these days when building something similar in Java required much more body movements, and maybe this was the reason to why some start-ups have chosen other weak typing languages with all their fancy Web frameworks for rapid bootstrapping. This isn't the case anymore, see how easy is creating a REST service that supports all CRUD operations in Java:

1. Define your task model:

 * Task model
public class Task {
  @GeneratedValue(strategy = GenerationType.AUTO)
  private long id;
  private String text;
  private Date created = new Date();
  private Date completed;

  public String getText() {
    return text;

  public void setText(String text) {
    this.text = text;

  public Date getCreated() {
    return created;

  public void setCreated(Date created) {
    this.created = created;

  public Date getCompleted() {
    return completed;

  public void setCompleted(Date completed) {
    this.completed = completed;

2. Tell what operations on tasks you're going to support:

 * This class defines DB operations on Task entity
public interface TaskRepository extends PagingAndSortingRepository<Task> {
  // Magic method name automatically generates needed query
  public List<Task> findByCompletedIsNull();

3. Configure your application:

 * This class is responsible for:
 *  - Setting up DB connection and ORM
 *  - Initializing REST service for all found entities
 *  - Starting Spring application (main entry point)
public class Application extends RepositoryRestMvcConfiguration {

  public DataSource dataSource() throws PropertyVetoException {
    MySQLDataSource dataSource = new MySQLDataSource();
    return dataSource;

  public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource) {
    HibernateJpaVendorAdapter jpaVendorAdapter = new HibernateJpaVendorAdapter();
    // Database tables will be created/updated automatically due to this:

    LocalContainerEntityManagerFactoryBean entityManagerFactoryBean = new LocalContainerEntityManagerFactoryBean();
    return entityManagerFactoryBean;

  public PlatformTransactionManager transactionManager() {
    return new JpaTransactionManager();

  public static void main(String[] args) {, args);

That's all! After invoking this application, you'll get a task complete REST service for free. Let's test it:

Create a new task:

~$ curl -X POST -H "Content-Type: application/json" -d '{"text":"Implement simplest REST Java application"}' http://localhost:8080/tasks

See the task contents:

~$ curl  http://localhost:8080/tasks/1
  "text" : "Implement simplest REST Java application",
  "created" : 1395665199000,
  "completed" : null,
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/tasks/1"

Create another task:

~$ curl -X POST -H "Content-Type: application/json" -d '{"text":"Go home"}' http://localhost:8080/tasks

Find all tasks:

~$ curl  http://localhost:8080/tasks
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/tasks{?page,size,sort}",
      "templated" : true
    "search" : {
      "href" : "http://localhost:8080/tasks/search"
  "_embedded" : {
    "tasks" : [ {
      "text" : "Implement simplest REST Java application",
      "created" : 1395665199000,
      "completed" : null,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/tasks/1"
    }, {
      "text" : "Go home",
      "created" : 1395665359000,
      "completed" : null,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/tasks/2"
    } ]
  "page" : {
    "size" : 20,
    "totalElements" : 2,
    "totalPages" : 1,
    "number" : 0
(pay an attention to how easy is it implementing pagination using this REST service!)

Mark the first task as complete:

~$ curl -X PATCH -H "Content-Type: application/json" -d "{\"completed\":$(($(date +%s)*1000))}" http://localhost:8080/tasks/1

Find incomplete tasks:

~$ curl  http://localhost:8080/tasks/search/findByCompletedIsNull
  "_embedded" : {
    "tasks" : [ {
      "text" : "Go home",
      "created" : 1395665359000,
      "completed" : null,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/tasks/2"
    } ]

Pretty easy and yet powerful, huh?

For more information and instructions for how to compile and run this application, see source code on GitHub.

Tuesday, February 04, 2014

Parallelization monster framework for Pentaho Kettle

We always end up with ROFL in our team, when trying to find a name for strange looking ETL processes diagrams. This monster has no name yet:

This is a parallelization framework for Pentaho Kettle 4.x. As you probably know in the upcoming version of Kettle (5.0) there's native ability to launch job entries in parallel, but we haven't got there yet.

In order to run a job in parallel, you have to call this abstract monster job, and provide it with 3 parameters:

  • Path to your job (which is supposed to run in parallel).
  • Number of threads (concurrency level).
  • Optional flag that says whether to wait for completion of all jobs or not.
Regarding the number of threads, as you can see the framework supports up to 8 threads, but it can be easily extended.

How this stuff works. "Thread #N" transformations are executed in parallel on all rows copies. Rows are split then, and filtered in these transformations by the given number of threads, so only a relevant portion of rows is passed to the needed job (Job - Thread #N). For example, if the original row set was:

           ["Apple", "Banana", "Orange", "Lemon", "Cucumber"]

and the concurrency level was 2, then the first job (Job - Thread #1) will get the ["Apple", "Banana", "Orange"] and the second job will get the rest: ["Lemon", "Cucumber"]. All the other jobs will get an empty row set.

Finally, there's a flag which tells whether we should wait until all jobs are completed.

I hope one will find attached transformations useful. And if not, at least help me find a name for the ETL diagram. Fish, maybe? :)

Sunday, January 26, 2014

"Be careful when using or accessing WiFi connection"

Last week my Skype account was hacked during my weekend holidays in Budapest. I don't know how this has happened - I only know that I was logged into Skype from iPhone, and I used a lot of free public WiFi, which are abundant in Budapest. The last day of my journey I tried to call out from Skype, and the call was finished too quickly, which should not have happened, since I remembered there was a ~30 bucks deposit on my account. I checked my account, and I've found a lot of calls to Belarus, which I didn't make of course:

There were more (tens) of entries like this.

The next thing I did was logging out from Skype iPhone app, and changing my password. Then I contacted Skype support, and I've got a Web chat with support engineer. I must say, their support reacted immediately to my request, which looked really professional from their side. I chatted about half an hour from my mobile phone's browser, but finally I've got a refund for all the calls I never did.

The incident is over now (actually, it was over the hour after I realized that my account was hijacked), but it raises the question: "How is that possible that my account was hacked? Is there some insecure part in Skype connection from the iPhone app, like sending credentials over non encrypted channel?". Unfortunately, I've got no answer from the support engineer, except for some funny comments/advises (Postfactum, I've read Skype security evaluation, but I haven't find anything that explains this incident either). Below are selected parts from the chat transcript:

Donald M: Michael, we understand that you would like to have your Skype account secured while using the application. 
Donald M: We’d be more than happy to assist you and provide you the best practice to keep you secured.
Donald M: To help you stay secure, we would like to share with you some useful tips and information about online security:
You: Ok, what can I do to keep the account secure?
Donald M: Please visit this link:

I've read everything on that page, but I didn't find anything useful except for choosing strong enough password (which was strong enough).

Donald M: We strongly advise that every customer installs sufficient security software, such as an antivirus and a firewall on all their devices that use Skype and to keep them enabled and up to date.
You: Antivirus on iPhone?
Donald M: Skype does its best to keep your communication and personal information secure. 
Donald M: Yes!
Donald M: However, please be aware that Skype users should also take precautions against security threats by not sharing their private data and should install adequate security software on all their devices that use Skype.
You: There's no antivirus software on for iPhone mobile phone

Previously, I explained to the support engineer that I use Skype solely from my mobile phone.

Donald M: Yes and be careful when using or accessing Wifi connection. 

This last sentence simply killed me. What can I do when using public WiFi? Maybe wrap my iPhone into a condom?

Thursday, January 16, 2014

Python code indentation nightmare

After numerous hesitations, overcoming my intuitive distaste of Python as programming language, I finally decided to learn it. This is not just for getting familiar with the core coolness of Python and getting myself into Python mainstream, but more for not finding myself becoming a Java-mastodon (a-la COBOL-mastodon or FORTRAN-mastodon) in the next 10 years. So, I created a little project, and started to push lines of code into it.

My first experience with Python has not changed anything about my bad attitude towards Python syntax:
  • Code structures simply don't look fine without opening and closing curly braces, everything looks like a big unstructured mess.
  • Doc-strings written as a comment in the body(!), where I can write any crap I want simply don't feel like an API documentation to a class/method/function.
  • """, which can be used either as a multi-line comment separator or ... as a multi-line string separator when assigned to a variable. Isn't it weird? One of my university professors used to say: "If you wan't to confuse a man, either call two different things with the same name, or call two equal things with different names", and I totally agree with him.
  • Two forms: "import thing" and "from thing import another_thing". Why do I need both of them? Why can't I just use: "import thing.another_thing" or "import thing [another_thing]" like in Perl?
  • Indentation...

Indentation worth another post. I've spent about an hour trying to understand why Unit test, which I added to a forked project's test suite doesn't run:

class TestSchemes(TestCase):
    def test_find(self):
        work = Scheme.find('wlan0', 'work')
        assert work.options['wpa-ssid'] == 'workwifi'

 # Added my test:
 def test_mytest(self):
  work = Scheme.find('wlan0', 'work')
  work = Scheme.find('wlan0', 'work')
  assert work == None

Trying to run - my test doesn't run, and there's no single error. I thought, may be test methods are registered somewhere (unlikely, but who knows..), but this wasn't the case. Maybe test_mytest is some registered name in Python? Tried another method names, but with no luck. Finally, I tried one more thing: copied and pasted one of the existing method's declarations using Vim's 'yy' and 'p' shortcuts, then renamed the pasted method name, and voila! That worked! Hm... What's the difference between:

    def test_mytest(self):
        work = Scheme.find('wlan0', 'work')
        work = Scheme.find('wlan0', 'work')
        assert work == None


 def test_mytest(self):
  work = Scheme.find('wlan0', 'work')
  work = Scheme.find('wlan0', 'work')
  assert work == None

Right.. this is white-space against tab. I always indent my code using tabs, so the method I added was also indented using tabs. As it turns out, the original code was indented using spaces, so the new method simply wasn't recognized until I replaced all tabs with spaces. This is really really weird situation, when indentation characters have an effect on execution flow. Don't you agree?

To complete, I'd like to write a couple of warm words about Python. There are plenty of frameworks and libraries written in Python. GitHub is teeming with lots and lots of interesting and fun projects, from which you can learn. If you want to build a fast prototype, it's awesome. Not as awesome as Java + Maven, but still :-) I'm mastodon...

May the source be with you.