Best Practices Blog 10 Min Read

Why Your Spark Apps are Slow or Failing:
Part I Memory Management

Best Practices Blog 10 Min Read
By: Rishitesh Mishra

Spark applications are easy to write and easy to understand when everything goes according to plan. However, it becomes very difficult when Spark applications start to slow down or fail. Sometimes a well-tuned application might fail due to a data change, or a data layout change. Sometimes an application which was running well so far, starts behaving badly due to resource starvation. The list goes on and on.

It’s not only important to understand a Spark application, but also its underlying runtime components like disk usage, network usage, contention, etc., so that we can make an informed decision when things go bad.

In this series of articles, I aim to capture some of the most common reasons why a Spark application fails or slows down. The first and most common is memory management.

If we were to get all Spark developers to vote, out of memory (OOM) conditions would surely be the number one problem everyone has faced.  This comes as no big surprise as Spark’s architecture is memory-centric. Some of the most common causes of OOM are:

  • Incorrect usage of Spark
  • High concurrency
  • Inefficient queries
  • Incorrect configuration

To avoid these problems, we need to have a basic understanding of Spark and our data. There are certain things that can be done that will either prevent OOM or rectify an application which failed due to OOM.  Spark’s default configuration may or may not be sufficient or accurate for your applications. Sometimes even a well-tuned application may fail due to OOM as the underlying data has changed.

Out of memory issues can be observed for the driver node, executor nodes, and sometimes even for the node manager. Let’s take a look at each case.

Out of memory at the driver level

A driver in Spark is the JVM where the application’s main control flow runs. More often than not, the driver fails with an OutOfMemory error due to incorrect usage of Spark. Spark is an engine to distribute workload among worker machines. The driver should only be considered as an orchestrator. In typical deployments, a driver is provisioned less memory than executors. Hence we should be careful what we are doing on the driver.

Common causes which result in driver OOM are:

  1. rdd.collect()
  2. sparkContext.broadcast
  3. Low driver memory configured as per the application requirements
  4. Misconfiguration of spark.sql.autoBroadcastJoinThreshold. Spark uses this limit to broadcast a relation to all the nodes in case of a join operation. At the very first usage, the whole relation is materialized at the driver node. Sometimes multiple tables are also broadcasted as part of the query execution.

Try to write your application in such a way that you can avoid all explicit result collection at the driver. You can very well delegate this task to one of the executors. E.g., if you want to save the results to a particular file, either you can collect it at the driver or assign an executor to do that for you.

out of memory

If you are using Spark’s SQL and the driver is OOM due to broadcasting relations, then either you can increase the driver memory if possible; or else reduce the   “spark.sql.autoBroadcastJoinThreshold” value so that your join operations will use the more memory-friendly sort merge join.

Out of memory at the executor level

This is a very common issue with Spark applications which may be due to various reasons. Some of the most common reasons are high concurrency, inefficient queries, and incorrect configuration. Let’s look at each in turn.

High concurrency

Before understanding why high concurrency might be a cause of OOM, let’s try to understand how Spark executes a query or job and what are the components that contribute to memory consumption.

Spark jobs or queries are broken down into multiple stages, and each stage is further divided into tasks. The number of tasks depends on various factors like which stage is getting executed, which data source is getting read, etc. If it’s a map stage (Scan phase in SQL), typically the underlying data source partitions are honored.

For example, if a hive ORC table has 2000 partitions, then 2000 tasks get created for the map stage for reading the table assuming partition pruning did not come into play. If it’s a reduce stage (Shuffle stage), then spark will use either “spark.default.parallelism” setting for RDDs or “spark.sql.shuffle.partitions” for DataSets for determining the number of tasks. How many tasks are executed in parallel on each executor will depend on “spark.executor.cores” property. If this value is set to a higher value without due consideration to the memory,  executors may fail with OOM. Now let’s see what happens under the hood while a task is getting executed and some probable causes of OOM.

Let’s say we are executing a map task or the scanning phase of SQL from an HDFS file or a Parquet/ORC table. For HDFS files, each Spark task will read a 128 MB block of data.  So if 10 parallel tasks are running, then memory requirement is at least 128 *10 only for storing partitioned data. This is again ignoring any data compression which might cause data to blow up significantly depending on the compression algorithms.

Spark reads Parquet in a vectorized format. To put it simply, each task of Spark reads data from the Parquet file batch by batch. As Parquet is columnar, these batches are constructed for each of the columns.  It accumulates a certain amount of column data in memory before executing any operation on that column. This means Spark needs some data structures and bookkeeping to store that much data. Also, encoding techniques like dictionary encoding have some state saved in memory. All of them require memory.

 

spark technical diagram

Figure: Spark task and memory components while scanning a table

So with more concurrency, the overhead increases. Also, if there is a broadcast join involved, then the broadcast variables will also take some memory. The above diagram shows a simple case where each executor is executing two tasks in parallel.

Inefficient queries

While Spark’s Catalyst engine tries to optimize a query as much as possible, it can’t help if the query itself is badly written. E.g., selecting all the columns of a Parquet/ORC table. As seen in the previous section, each column needs some in-memory column batch state. If more columns are selected, then more will be the overhead.

Try to read as few columns as possible. Try to use filters wherever possible, so that less data is fetched to executors. Some of the data sources support partition pruning. If your query can be converted to use partition column(s), then it will reduce data movement to a large extent.

Incorrect configuration

Incorrect configuration of memory and caching can also cause failures and slowdowns in Spark applications. Let’s look at some examples.

Executor & Driver memory

Each application’s memory requirement is different. Depending on the requirement, each app has to be configured differently. You should ensure correct spark.executor.memory or spark.driver.memory values depending on the workload.  As obvious as it may seem, this is one of the hardest things to get right. We need the help of tools to monitor the actual memory usage of the application. Unravel does this pretty well.

Memory Overhead

Sometimes it’s not executor memory, rather its YARN container memory overhead that causes OOM or the node gets killed by YARN. “YARN kill” messages typically look like this:

memory overhead

YARN runs each Spark component like executors and drivers inside containers. Overhead memory is the off-heap memory used for JVM overheads, interned strings and other metadata of JVM. In this case, you need to configure spark.yarn.executor.memoryOverhead to a proper value. Typically 10% of total executor memory should be allocated for overhead.

Caching Memory

If your application uses Spark caching to store some datasets, then it’s worthwhile to consider Spark’s memory manager settings. Spark’s memory manager is written in a very generic fashion to cater to all workloads. Hence, there are several knobs to set it correctly for a particular workload.

Spark has defined memory requirements as two types: execution and storage. Storage memory is used for caching purposes and execution memory is acquired for temporary structures like hash tables for aggregation, joins etc.

Both execution & storage memory can be obtained from a configurable fraction of (total heap memory – 300MB). That setting is “spark.memory.fraction”. Default is 60%. Out of which, by default, 50% is assigned (configurable by “spark.memory.storageFraction”) to storage and rest assigned for execution.

There are situations where each of the above pools of memory, namely execution and storage, may borrow from each other if the other pool is free. Also, storage memory can be evicted to a limit if it has borrowed memory from execution. However, without going into those complexities, we can configure our program such that our cached data which fits in storage memory should not cause a problem for execution.

If we don’t want all our cached data to sit in memory, then we can configure “spark.memory.storageFraction” to a lower value so that extra data would get evicted and execution would not face memory pressure.

Out of memory at Node Manager

Spark applications which do data shuffling as part of group by or join like operations, incur significant overhead. Normally data shuffling process is done by the executor process. If the executor is busy or under heavy GC load, then it can’t cater to the shuffle requests. This problem is alleviated to some extent by using an external shuffle service.

External shuffle service runs on each worker node and handles shuffle requests from executors. Executors can read shuffle files from this service rather than reading from each other. This helps requesting executors to read shuffle files even if the producing executors are killed or slow. Also, when dynamic allocation is enabled, its mandatory to enable external shuffle service.

When Spark external shuffle service is configured with YARN, NodeManager starts an auxiliary service which acts as an External shuffle service provider. By default, NodeManager memory is around 1 GB. However, applications which do heavy data shuffling might fail due to NodeManager going out of memory. Its imperative to properly configure your NodeManager if your applications fall into the above category.

End of Part I – Thanks for the Memory

Spark’s in-memory processing is a key part of its power. Therefore, effective memory management is a critical factor to get the best performance, scalability, and stability from your Spark applications and data pipelines. However, the Spark defaults settings are often insufficient. Depending on the application and environment, certain key configuration parameters must be set correctly to meet your performance goals. Having a basic idea about them and how they can affect the overall application helps.

I have provided some insights into what to look for when considering Spark memory management. This is an area that the Unravel platform understands and optimizes very well, with little, if any, human intervention needed. I recommend you to schedule a demo to see Unravel in action.The performance speedups we are seeing for Spark apps are pretty significant.

In subsequent blogs, I will be discussing other key issues that impact Spark performance including data skew, parallelism and partitions, common misconfigurations, and more.