Airflow + PySpark over Livy. Sessions and batches.


Recently at work we've been trying to find a way to run Spark jobs from Airflow that best suits our needs. Turns out, there's more than one way to skin a cat.

 To sum up our findings, you have a choice between one of the following options:

1. Invoking EMR Steps API (EMR only)

 As you can probably tell, this list is going to go in order of my personal preference, starting with the least preferred.

 EMR is Amazon's take on big data platform, running on vanilla EC2 instances and including all major tools you would expect to see in a big data cruncher: Hadoop, Hive, and Spark among others.
If you're dead set on using Amazon infra for life and never expect to migrate to other ways of running a Spark cluster - then this option is for you.

How to run jobs:
 'Providers' section of Airflow has been sporting a collection of Steps operators and sensors for quite a while now. This example will get you started.

Pros:
  • You can not only execute jobs, but even manipulate EMR clusters. Ability to bring up and terminate clusters on-demand is a great trick to have up your sleeve.
Cons (things I consider deal-breakers are in italic):
  • Ties you to a specific cloud-based solution that can be discontinued any time. I don't think it's likely, but it's always better to have options.
  • As far as I know, you can only submit a JAR file to be executed as a step. Of course, if Java is your cup of coffee (😜 I'll show myself out) then it's not an issue at all.
  • It's currently not possible to run Steps concurrently.

2. Setting up Spark Master context on the same node as Airflow Scheduler

 This mode of operation expects you to have a spark-submit binary and YARN client config set up on the same machine where Airflow runs. It cooks up a correct spark-submit command line with all required arguments and executes it. Since the job is submitted in client mode, you can stream stdout and stderr logs directly to Airflow.

 In a nutshell, if your Spark cluster runs on EMR, it requires heaps of failure-prone operations: copying a folder with configuration files over from Spark master node to Airflow node, installing Spark on Airflow node, then whitelisting certain ports so that Airflow and Spark master nodes can communicate.

 I did not investigate this option in detail as it did not seem to offer much advantage over others. One note: when I was trying this method out, I've noticed that restarting EMR cluster invalidates the context and you have to copy it over again, because the context is dependent on IP addresses of Spark nodes.

How to run jobs:
There are two options to choose from:
  1. SparkSubmitOperator in Airflow providers section. Additionally, you can read some guidelines on using it from this guy on Medium.
  2. This custom plugin - it doesn't appear to be actively maintained though: at the time of writing the last commit in that repo took place 2 years ago.
Also, here's an article if EMR is where you're planning to set this up.

Pros:
  • Streams spark-submit logs directly to Airflow.
  • Full choice of every language Spark has to offer: Scala, Java, Python, R.
Cons (things I consider deal-breakers are in italic):
  • Each IP address change (e.g. redeployment of EMR cluster) necessitates regeneration of context on Airflow node that executes spark-submit. This is cumbersome and failure-prone.
  • Generated spark-submit command is a really long string and therefore is hard to read.

3. SSHOperator

 With this option, we're connecting to Spark master node via SSH, then invoking spark-submit on a remote server to run a pre-compiled fat jar/Python file/R file (not sure about that) from HDFS, S3 or local filesystem.

How to run jobs:
I'd only managed to find source code for SSHOperator in 'Providers' section of Airflow. Should be relatively simple use, since it's just a buffed up BashOperator.

Pros
  • Streams Spark job logs directly to Airflow console.
  • Airflow and Spark don't have to coexist on the same node.
  • You don't need to copy any files over from Spark master node as was required in the previous option.
  • Write code in all Spark-native languages: Scala, Java, Python, and probably R.
Cons (things I consider deal-breakers are in italic):
  • Generated spark-submit command is a really long string and therefore is hard to read.
  • Not clear what happens if SSH connection is lost mid-run. Technically, that should abort whatever was running in that session (and that means the job in our case).

4. Apache Livy

 Livy is a REST client for Spark clusters. This is a perfect choise if you want to decouple your code from deployment configuration: Livy client runs on the master node, listens for incoming REST calls and manages job execution.

Docs: Livy REST API

 Apache Livy is actually not just one, but 2 distinct options as it provides two modes of submitting jobs to Spark: sessions and batches.

Since submitting jobs over Livy is the option I've explored the most, I've come up with two solutions to ease and speed up discovery and development:

a) Sessions

You create a session via REST API, then submit lines of code (aka statements) - also through REST API.One statement is one or more lines of code, and a single session can execute any number of statements.

It's important to keep in mind that Livy session ≠ Spark session, for instance it's not possible to share Dataframes through Global temporary views in Livy sessions, even though it's called a session!

How to run jobs: REST client
You can use my Docker Compose Spark cluster to quickly run those light jobs, just to get a feel of what Spark feels like, without having to bring up a full-fledged cluster that costs money 😉.

Creating a session (specifying "kind": "spark|pyspark|sparkR|sql" is not mandatory anymore, see REST API docs): Request above will return the session id.

Now we can submit code through statements - in any of the available languages! And finally, checking the state of all statements:


How to run jobs: Airflow
 To apply this to Airflow, we'll make the DAG perform the same kinds of POST/GET requests using HttpOperator and HttpsSensor. You'll find more sample DAGs in the git repository.
Pros:
  • Job language: Scala, Python, R and SQL. This is rich compared to other options where only Java is available.
  • Very detailed information about each statement. Including exceptions, of course - statement shows as failed if an exception was thrown.
  • Easy to recreate the exact job submitted by Airflow - all you need is any REST client.
  • Rendered statement (code) templates are visible in WebUI:

Cons (things I consider deal-breakers are in italic):
  • Have to escape everything twice.
  • Have to be mindful of quotes in session file (e.g. passing ‘ “ ‘ into a session’s string literal enclosed in “ ” will look like “”” in the rendered template vs passing ‘ “ “ into a session’s string literal enclosed in ‘ ’)
  • Can’t run session code separately to debug it
  • params field is not Jinja-templatable, so all of the "magic" is pushed down into a Spark job code. This looks ok for small jobs and downright ugly once you get past 50 lines of code.
  • Directly follows the previous point: your Spark jobs can not be executed outside Airflow because they include Jinja templates:

    Yes, this is a valid Jinja-templated code, with double-escaping and whatnot.

  • Livy Sessions are NOT Spark sessions, even though it's implied from the name.

b) Batches.

Batch is a way to execute long-running Spark code. With batches you submit a path to Spark code via REST API. This file is picked up by Spark and executed. One batch = one Spark job.

How to run jobs: REST client

http://gethue.com/how-to-use-the-livy-spark-rest-job-server-api-for-submitting-batch-jar-python-and-streaming-spark-jobs/
https://stackoverflow.com/questions/51566029/airflow-http-callback-sensor

You build your job as a fat jar (Java or Scala, no need to build anything for Python or R), save it somewhere your cluster can access it (HDFS, S3, local filesystem) and submit it like this:
Command above gives you the batch of newly submitted ID.

You can poll it through another endpoint:


or
to get the full logs, not just the status.




Here's an example of what the Livy batch does internally:

How to run jobs: Airflow
Same approach as we applied with sessions: codify the REST API calls, as if they were being manually executed.
Pros:
  • Same choice of languages as above + Java.
Cons (things I consider deal-breakers are in italic):
  • When submitting Spark jobs in cluster mode, they always mark as finished even if exception gets thrown during execution. I've noticed this in Amazon EMR v 5.26.0. Alas, I could not find a way to propagate exceptions to Livy - the only way to understand if something went wrong is to parse the logs for the word "Exception" and this is unreliable.

c) Batches + Spark/YARN REST API

We were not satisfied with two approaches above: Livy Batches (when executed in Spark's cluster mode) always show up as "complete" even if they actually failed, and Livy Sessions result in heavily modified Spark jobs that you can't execute without feeding them to Jinja engine first.

Then I learned about a cool feature in Spark that made this perfect hybrid approach possible: Spark REST API. It was buried in the middle of a page with nondescript name "Monitoring and instrumentation" so it took me a while to find it. That gave me an idea, and eventually I found one more thing: YARN Resource Manager also runs a REST API.

It was only a matter of time to try those out, see that both APIs always shows the actual status of a Spark job, and add this to the Batch Operator.

How to run jobs: REST client
Follow the same steps as for the batches.
Take note of the appId field when doing GET /batches/${BATCH_ID}
It may look something like this: "application_1583256958670_0001"
Now, access the app status via Spark REST API:
This endpoint returns a list of constituent jobs for our app. If all of them have "status": "SUCCEEDED" in them, that means the job completed successfully.


Same for YARN REST API (we're looking for $.app.state field - anything but "SUCCESS" is bad):



How to run jobs: Airflow
Same as LivyBatchOperator above, but you pass an additional parameter:
verify_in="spark"
#or
verify_in="yarn"

Pros:
  • Combines the best of both worlds: arguments that you pass to Spark code are templatable and that means you have jobs that you can execute outside of Airflow, you can see what exactly went wrong with the job if it failed.
Cons:
  • None that I know of!

Comments