Spark作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。
在执行Spark的应用程序时,Spark集群会启动Driver和Executor两种JVM进程:
- Driver进程:为主控进程,负责创建Spark上下文对象,提交Spark作业(Job),并将作业转化为计算任务(Task),在各个Executor进程之间协调任务的调度。
- Executor进程:负责在工作节点上执行具体的计算任务,并将结果返回给Driver,同时为需要持久化的RDD提供存储功能。
一、内存模型概述
1、Spark管理的内存
Spark管理的内存主要划分为4个区域:
-
系统区:Spark运行自身的代码需要一定的内存空间。
-
用户区:用户自己写的一些UDF之类的代码也需要一定的空间来运行。
-
存储区:Spark的任务就是操作数据,Spark将数据尽可能的存储在内存,而这些数据也需要占用内存空间。
-
执行区:Spark操作数据的单元是Partition,Spark在执行一些Shuffle、Join、Sort、Aggregation之类的操作,需要把Partition加载到内存进行运算,这也会运用到部分内存。
2、Spark内存模型
image-
第一层:整个Executor所用到的内存;
-
第二层:分为JVM中的内存和JVM外的内存,这里的JVM内存在Yarn的时候就是指申请的Container的内存;
-
第三层:对于Spark来说内存分为三部分:堆内内存、Memory Overhead、堆外内存
-
On-heap:堆内内存。
-
Memory Overhead:对应的参数为:spark.yarn.executor.memoryOverhead。这块的内存主要用于虚拟机的开销,内部的字符串,还有一些本地开销(比如python需要用到的内存)等。其实就是额外的内存,spark并不会对这块内存进行管理。
-
Off-Heap:对应的参数为:spark.memory.offHeap.size。广义上是指所有的堆外内存,这部分内存的申请和释放是直接进行的,不通过JVM管控,所以没有GC,被Spark分为 Storage 和 Execution两部分,进行统一管理。
-
第四层:JVM堆内内存分为三个部分:
-
Reserved Memory:预留内存300M,用于保障Spark正常运行
-
Other Memory:用于Spark内部的一些元数据、用户的数据结构、防止出现对内存估计不足导致OOM时的内存缓存、占用空间比较大的记录做缓存
-
Memory Faction:spark主要控制的内存,由参数 spark.memory.faction 配置。
-
第五层:分为Storage 和 Execution。由参数 spark.memory.storageFraction配置它们两大小。
-
Execution:执行内存。用于Spark的计算,shuffle、sort、aggregation等计算时会用到的内存,如果计算时内存不足则会向Storage部分借用内存,如果还是不够就会spill到磁盘
-
Storage:主要用于RDD的缓存,如果Execution 来借内存,可能会牺牲自己丢弃缓存来借给Execution,Storage也可以向Execution借内存,但是Execution不会牺牲自己。
二、内存模型详解
1、堆内内存和堆外内存详解
Executor作为一个JVM进程,它的内存管理建立在JVM的内存管理之上,Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
image1)堆内内存
堆内内存的大小,由 Spark 应用程序启动时的 executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同。
Spark 对堆内内存的管理是一种逻辑上的"规划式"的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存,我们来看其具体流程:
-
申请内存:
-
Spark在代码中new一个对象实例
-
JVM从堆内内存分配空间;
-
Spark 保存该对象的引用,记录该对象占用的内存。
-
释放内存
-
Spark记录该对象释放的内存,删除该对象的引用;
-
等待JVM的垃圾回收机制释放该对象占用的堆内内存。
2、堆外内存
为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始),在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现,Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
三、内存管理模型
Spark 1.6 之后默认为统一管理(UnifiedMemoryManager)方式,1.6 之前采用的静态管理(StaticMemoryManager)方式仍被保留,可通过配置 spark.memory.useLegacyMode=true 参数启用静态内存管理方式。下面我们介绍下两种内存管理模型的进化。
1、静态内存管理
在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如下所示:
2、统一内存管理
Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。如下图所示:
image其中最重要的优化在于动态占用机制,其规则如下:
-
设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围。
-
双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)。
-
Executio内存的空间被Storage内存占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间。
-
Storage内存的空间被Execution内存占用后,无法让对方"归还",因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。
新的版本引入了新的配置项:
-
spark.memory.fraction(默认值为0.75):用于execution和storage的堆内存比例。该值越低,越容易发生spill和缓存数据回收。该配置实际上也限定了OTHER内存大小,以及偶发超大record的内存消耗。
-
spark.memory.storageFraction(默认值为0.5):显然,这是存储内存所占spark.memory.fraction设置比例内存的大小。当整体的存储容量超过该比例对应的容量时,缓存的数据会被回收。
-
Reserved Memory:默认都是300MB,这个数字一般都是固定不变的,在系统运行的时候 Java Heap 的大小至少为 Heap Reserved Memory x 1.5. e.g. 300MB x 1.5 = 450MB 的 JVM配置。
-
spark.memory.useLegacyMode(默认值为false):若设置为true,则使用1.6版本前的内存管理机制。此时,如下五项配置均生效:
3、动态占用机制
image凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。譬如,所以如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。
网友评论