Table of Contents
This post is the first part of a series of posts on caching, and it covers basic concepts for caching data in Spark applications. Following posts will cover more how-to’s for caching, such as caching DataFrames, more information on the internals of Spark’s caching implementation, as well as automatic recommendations for what to cache based on our work with many production Spark applications. For a more general overview of causes of Spark performance issues, as well as an orientation to our learning to date, refer to our page on Spark Performance Management.
Caching RDDs in Spark
It is one mechanism to speed up applications that access the same RDD multiple times. An RDD that is not cached, nor checkpointed, is re-evaluated again each time an action is invoked on that RDD. There are two function calls for caching an RDD: cache() and persist(level: StorageLevel). The difference among them is that cache() will cache the RDD into memory, whereas persist(level) can cache in memory, on disk, or off-heap memory according to the caching strategy specified by level. persist() without an argument is equivalent with cache(). We discuss caching strategies later in this post. Freeing up space from the Storage memory is performed by unpersist().
When to use caching
As suggested in this post, it is recommended to use caching in the following situations:
- RDD re-use in iterative machine learning applications
- RDD re-use in standalone Spark applications
- When RDD computation is expensive, caching can help in reducing the cost of recovery in the case one executor fails
1. val sc = new SparkContext(sparkConf)
2. val ranGen = new Random(2016)
3.
4. val pairs1 = sc.parallelize(0 until numMappers, numPartitions).flatMap { p =>
5.
6. // map output sizes lineraly increase from the 1st to the last
7. val numKVPairsLocal = (1.0 * (p+1) / numMapppers * numKVPairs).toInt
8.
9. var arr1 =new Array(Int, Array[Byte])(numKVPairsLocal)
10. for(i<=0 until numKVPairsLocal){
11. val byteArr = new Array[Byte](valSize)
12. ranGen.nextBytes(byteArr)
13.
14. // generate more like a zipfian skew
15. var key: Int = 0
16. if (ranGen.nextInt(100) > 50)
17. key = 0
18. else
19. key = ranGen.nextInt(numKVPairsLocal)
20.
21. arr1(i) = (key, byteArr)
22. }
23. arr1
24. }
25. .cache()
26.
27. pairs1.count()
28.
29. printIn("Number of pairs: " + pairs1.count)
30. printIn("Count result " + pairs1.groupByKey(numReducers.count()))
31.
32. sc.stop()
Caching example
Block eviction
Now let us also consider the situation that some of the block partitions are so large that they will quickly fill up the storage memory used for caching. The partitions generated in the example above are skewed, i.e., more key-value pairs are allocated for the partitions with higher IDs. Highly skewed partitions are the first candidates that will not fit into the cache storage.
When the storage memory becomes full, an eviction policy (i.e., Least Recently Used) will be used to make up space for new blocks. This situation is not ideal, as cached partitions may be evicted before actually being re-used. Depending on the caching strategy adopted, evicted blocks are cached on disk. A better use case is to cache only RDDs that are expensive to re-evaluate, and have a modest size such that they will fit in the memory entirely. Making such decisions before application execution may be challenging as it is unclear which RDDs will fit in the cache, and which caching strategy is better to use (i.e., where to cache: in memory, on disk, off heap, or a combined version of the above) in order to achieve the best performance. Generally, a caching strategy that caches blocks in memory and on disk is preferred. For this case, cached blocks evicted from memory are written to disk. Reading the data from disk is relatively fast compared with re-evaluating the RDD [1].
Under the covers
Internally, caching is performed at the block level. That means that each RDD consists of multiple blocks and each block is being cached independently of the other blocks. Caching is performed on the node that generated that particular RDD block. Each Executor in Spark has an associated BlockManager that is used to cache RDD blocks. The memory allocation of the BlockManager is given by the storage memory fraction (i.e., spark.memory.storageFraction) which gives the fraction from the memory pool allocated to the Spark engine itself (i.e., specified by spark.memory.fraction) . A summary of memory management in Spark can be found here. The BlockManager manages cached partitions as well as intermediate shuffle outputs. The storage, the place where blocks are actually stored, can be specified through the StorageLevel (e.g., persist(level: StorageLevel)). Once the storage level of the RDD has been defined, it cannot be changed. An RDD block can be cached in memory, on disk, or off-heap as specified by level.
Caching strategies (StorageLevel): RDD blocks can be cached in multiple stores (memory, disk, off-heap), in serialized or non-serialized format.
- MEMORY_ONLY: Data is cached in memory only in non-serialized format.
- MEMORY_AND_DISK: Data is cached in memory. If enough memory is not available, evicted blocks from memory are serialized to disk. This mode of operation is recommended when re-evaluation is expensive and memory resources are scarce.
- DISK_ONLY: Data is cached on disk only in serialized format.
- OFF_HEAP: Blocks are cached off-heap, e.g., on Alluxio [2].
- The caching strategies above can also use serialization to store the data in serialized format. Serialization increases the processing cost but reduces the memory footprint of large datasets. These variants append “_SER” suffix to the above schemes. E.g., MEMORY_ONLY_SER, MEMORY_AND_DISK_SER. DISK_ONLY and OFF_HEAP always write data in serialized format.
- Data can be also replicated to another node by appending “_2” suffix to the StorageLevel: e.g., MEMORY_ONLY_2, MEMORY_AND_DISK_SER_2. Replication is useful for speeding up recovery in the case one node of the cluster (or an executor) fails.
- A full description of caching strategies can be found here.
Summary
- Caching is very useful for applications that re-use an RDD multiple times. Iterative machine learning applications include such RDDs that are re-used in each iteration.
- Caching all of the generated RDDs is not a good strategy as useful cached blocks may be evicted from the cache well before being re-used. For such cases, additional computation time is required to re-evaluate the RDD blocks evicted from the cache.
- Given a large list of RDDs that are being used multiple times, deciding which ones to cache may be challenging. When memory is scarce, it is recommended to use MEMORY_AND_DISK caching strategy such that evicted blocks from cache are saved to disk. Reading the blocks from disk is generally faster than re-evaluation. If extra processing cost can be afforded, MEMORY_AND_DISK_SER can further reduce the memory footprint of the cached RDDs.
- If certain RDDs have very large evaluation cost, it is recommended to replicate them to another node. This will boost significantly performance in the case of a node failure, since re-evaluation can be skipped.
Further Reading
Those interested in Spark might find these two technical blogs useful in understanding performance issues for the Spark Platform:
Why Your Spark Applications are Slow or Failing: Part I Memory Management
Why Your Spark Apps Are Slow or Failing Part II Data Skew and Garbage Collection
References:
[1] https://community.databricks.com/t5/data-engineering/should-i-always-cache-my-rdd-s-and-dataframes/td-p/30763[2] “Learning Spark: Lightning-Fast Big Data Analysis”. Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia. O’Reilly Media, 2015.
*Image from https://www.storagereview.com/