memoryOverhead issue in Spark

When using Spark and Hadoop for Big Data applications you may find yourself asking: How to deal with this error, that usually ends-up killing your job: Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.?

When I was trying to extract deep-learning features from 15T images, I was facing issues with the memory limitations, which resulted in executors getting killed by YARN, and despite the fact that the job would run for a day, it would eventually fail. The dataset had 200k partitions and our cluster was of version Spark 1.6.2. In general, I had this figure in mind:

sparkmem
Fig 1: Container memory layout

The first thing to do, is to boost ‘spark.yarn.executor.memoryOverhead’, which I set to 4096. To find out the max value of that, I had to increase it to the next power of 2, until the cluster denied me to submit the job. However, this didn’t resolve the issue.

The second thing to take into account, is whether your data is balanced across the partitions! You see, the RDD is distributed across your cluster. Every slice/piece/part of it is named a partition. So, the more partitions you have, the smaller their sizes are. If I have 200k images and 4 partitions, then the ideal thing is to have 50k(=200k/4) images per partition. With 8 partitions, I would want to have 25k images per partition.

You want your data to be balanced, for performance reasons usually, since as with every distributed/parallel computing job, you want all your nodes/threads to have the same amount of work. If for example, you had 4 partitions, with the first 3 having 20k images each and the last one, the 4th, having 180k images, then what will (likely) happen is that the first three will finish much earlier than the 4th, which will have to process much more images (x9) and in overall, our job will have to wait for that 4th chunk of data to be processed, thus, in overall, our job will be much slower than if the data were balanced along the partitions.

However, while this is of most significance for performance, it also can result in an error. You see more data, means more memory, which may result in spikes, that will go out of memory bounds, triggering the kill of the container from YARN. Think about it like this (taken from slides):

memoryoverhead
Fig 2: Memory profiling and memoryOverhead

The solution to this is to use repartition(), which promises that it will balance the data across partitions. In practice though, things are not that simple, especially with Python, as discussed in Stackoverflow: How to balance my data across the partitions?, where both Spark 1.6.2 and Spark 2.0.0 fail to balance the data. However, Scala seems to do the trick.

As also discussed in the SO question above, upgrading to Spark 2.0.0, might resolve errors like this:

16/08/08 20:56:00 ERROR YarnClusterScheduler: Lost executor 68 on node.of.cluster.com: Container killed by YARN for exceeding memory limits. 12.4 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Another important factor, is the cores number; a decrease in that will result in holding less tasks in memory at one time, than with the maximum number of cores. So less concurrent tasks, less overhead space. Notice that here we sacrifice performance and CPU efficiency for reliability, which when your job fails to succeed, makes much sense!

Another approach would be to schedule the Garbage Collector to kick-in more frequently than the default, which will have an estimated ~15% slowdown, but will get rid of unused memory more frequently. Moreover, you can try having Spark exploiting some kind of structure in your data, by passing the flag `–class sortByKeyDF`.

In addition, the number of partitions is also critical for your applications. As mentioned before, the more the partitions, the less data each partition will have. But what’s the trade-off here? Except from the fact your partitions might become too tiny (if they are too many for your current dataset), a large number of partitions means a large number of output files (yes, the number of partitions is equal to the number of part-xxxxx files you will get in the output directory), and usually if the the partitions are too many, the output files are small, which is OK, but the problem appears with the metadata HDFS has to housekeep, which puts pressure in HDFS and decreases its performance. So, finding a sweet spot for the number of partitions is important, usually something relevant with the number of executors and number of cores, like their product*3 would be nice, like this:

total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))
d = sc.textFile('/projects/flickr/clayton/yfcc100m_8k_codes', 3 * total_cores)

Going back to Figure 1, decreasing the value of ‘spark.executor.memory’ will help, if you are using Python, since Python will be all off-heap memory and would not use the ram we reserved for heap. So, by decreasing this value, you reserve less space for the heap, thus you get more space for the off-heap operations (we want that, since Python will operate there). ‘spark.executor.memory’ is for JVM heap only. You may not need that much, but you may need more off-heap, since there is the Python piece running. All the Python memory will not come from ‘spark.executor.memory’. So, by setting that to its max value, you probably asked for way, way more heap space than you needed, and more of the physical ram needed to be requested for off-heap.

Eventually, what worked for me was:

  1. Set ‘spark.yarn.executor.memoryOverhead’ maximum (4096 in my case)
  2. Repartition the RDD to its initial number of partitions. (200k in my case)
  3. Set ‘spark.executor.cores’ to 4, from 8.
  4. Set ‘spark.executor.memory’ to 12G, from 8G.

The reason adjusting the heap helped is because you are running pyspark. That starts both a python process and a java process. The java process is what uses heap memory, the python process uses off heap. Normally you can look at the data in the spark UI to get an approximation of what your tasks are using for execution memory on the JVM. There isn’t a good way to see python memory. Depending on what you are doing can result in one of the other using more memory. Basically you took memory away from java process to give to the python process and seems to have worked for you.

The number of cores you configure (4 vs 8) affects the number of concurrent tasks you can run. With 4 cores you can run 4 tasks in parallel, this affects the amount of execution memory being used. The spark executor memory is shared between these tasks. So with 12G heap memory running 8 tasks, each gets about 1.5GB with 12GB heap running 4 tasks each gets 3GB memory. This is obviously just a rough approximation.

Balancing the data across partitions, is always a good thing to do, for performance issues, and for avoiding spikes in the memory trace, which once it overpasses the memoryOverhead, it will result in your container be killed by YARN.

Have questions? Comments? Did you find a bug? Let me know!😀
Page created by G. (George) Samaras (DIT)