参考资料:https://zhuanlan.zhihu.com/p/115888408 (真的很不错的文章,受益匪浅)
前言
Executor 内存管理方面讲解的很多,今天第一次细细来学习一下
内存管理相关我觉得理解可以让我们更对于广播变量以及缓存包括shuffle有些更好的控制和理解
spark在yarn模式下运行的时候,其申请的资源是以container的形式存在的。最大资源申请受yarn.scheduler.maximum-allocation-mb这个值决定。
内存组成和分配
内存分为堆内内存和堆外内存。内存分配的总览图如下图所示(借用的图)
image.png
堆内内存
堆内内存onheap由spark.executor.memory指定,堆内内存executorMemory是spark使用的主要部分,其大小通过-Xmx参数传给jvm,内部有300M的保留资源不被executor使用。
300M的源代码以及相关解释如下
// Set aside a fixed amount of memory for non-storage, non-execution purposes.
// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
// the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
此外的可用内存usableMemory被分为spark管理的内存和用户管理的内存两部分,spark管理的内存通过spark.memory.fraction进行控制,默认0.6。这块内存在spark中被称为unified region(代号M)或统一内存或可用内存,其进一步被分为执行内存ExecutionMemory和StorageMemory,见上图。其中storage memory(代号R)是M的一个subregion,其的大小占比受spark.memory.storageFraction控制,默认为0.5,即默认占usableMemory的 0.6*0.5=0.3。我们用onHeapStorageRegionSize来表示storage这部分的大小
源码见下面 UnifiedMemoryManager
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
maxHeapMemory = maxMemory,
onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
1、ExecutionMemory执行内存:主要存储Shuffle、Join、Sort、Aggregation等计算过程中的临时数据;
2、StorageMemory存储内存:主要存储spark的cache数据,如RDD.cache RDD.persist在调用时的数据存储,用户自定义变量及系统的广播变量等
这两块内存在当前默认的UnifiedMemoryManager(Spark1.6引入)下是可以互相动态侵占的,即Execution内存不足时可以占用Storage的内存,反之亦然,其详细规则如下:
1、Execution内存不足且onHeapStorageRegionSize有空闲时,可以向Storage Memory借用内存,但借用后storage不能将execution占用的部分驱逐evict出去,只能等着Execution自己释放。
2、Storage内存不足时可以借用Execution的内存,且当Execution又有内存资源需求时可以驱逐Storage占用的部分,但只能驱逐StorageMemory-onHeapStorageRegionSize的大小,原来划定的onHeapStorageRegionSize且在使用的不可被抢占。
这里我做了下一个小小的测试,执行了一个测试样例,提交参数如下所示
/opt/beh/core/spark/bin/spark-submit --master yarn --class com.example.sparklearn.test.Demo1 --num-executors 1 --executor-memory 8g --executor-cores 4 /home/hadoop/zgh/sparklearn-0.0.1-SNAPSHOT.jar
然后查看UI界面,发现单个executor的Storage Memory的内存只有4.4G,那么这4.4G怎么来的呢,我们一起来计算下,如下图所示
image.png
分配的executor-memory为8g,
依照java内存的划分,其堆内存分为eden、survivor*2和tenured部分,同时刻只有一个survivor可用,因而指定的堆内存实际可用的内存即Runtime.getRuntime.maxMemory查到的通常比Xmx指定的要小,一般为90-95%的样子,我们这里可以使用这种方式
scala> val demo = spark.sql("select * from ceshi.xunlian");
demo: org.apache.spark.sql.DataFrame = [name: string]
scala> demo.map(x=>{Runtime.getRuntime.maxMemory}).collect
20/06/24 11:39:57 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 243924ms (threshold=30000ms); ack: seqno: 9 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0, targets: [DatanodeInfoWithStorage[192.168.1.113:50010,DS-1d2bffdf-ff9a-4f97-bbd0-734b97636128,DISK]]
res3: Array[Long] = Array(7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432)
来查看具体的应用中实际内存情况
这么一看是7635730432B,相当于7282M。 7282M0.6=4369.2 。实际这里显示时不是将字节除以102410241024,而是直接除以10001000*1000得到的。
上面说了占可用内存spark.memory.fraction(0.6)的spark 统一内存,另外0.4的用户内存用于存储用户代码生成的对象及RDD依赖等,用户在处理partition中的记录时,其遍历到的记录可以看做存储在Other区,当需要将RDD缓存时,将会序列化或不序列化的方式以Block的形式存储到Storage内存中。
堆外内存
前面说了,堆外内存有的是参数spark.yarn.executor.memoryOverhead控制,有的是参数spark.memory.offHeap.size控制,这个都算offheap内存,不过前者主要用于JVM自身,字符串, NIO Buffer等开销,而后者主要是供统一内存管理用作Execution Memory及Storage Memory的用途。
spark.yarn.executor.memoryOverhead设置的内存默认为executor.memory的0.1倍,最低384M,这个始终存在的,在采用yarn时,这块内存是包含在申请的容器内的,即申请容器大小大于spark.executor.memory+spark.yarn.executor.memoryOverhead。
而通过spark.memory.offHeap.enable/size申请的内存不在JVM内,spark.memory.offHeap.enable 默认为false,spark.memory.offHeap.size 默认值为0。Spark利用TungSten技术直接操作管理JVM外的原生内存。主要是为了解决Java对象开销大和GC的问题。
网友评论