美文网首页Spark_Flink_Hadoopspark
漫谈Spark内存管理 (一)

漫谈Spark内存管理 (一)

作者: 旺旺鸽不鸽 | 来源:发表于2019-07-14 10:19 被阅读45次

    谈到Spark内存管理,估计大家都会想到:static memory manager,unified memory manager,execution memory,storage memory,tungsten, task memory manager等一系列模块。网络上介绍这些模块的文章已经非常多,笔者不想一个个地系统介绍,只想"漫不经心"地谈谈平时思考过的关于spark内存管理的一些问题,比如:

    1. Spark的内存管理与JVM的内存分配回收机制有什么区别和联系?哪些事是spark内存管理做的,哪些事是JVM做的?

    2. Spark中用到内存的地方有哪些?on heap内存在哪些地方用到?off heap内存在哪些地方用到?

    3. Spark程序出现OOM的可能原因有哪些?除了用户代码外,Spark自身框架有哪些环节可能出现OOM?

    4. Tungsten在内存优化方面都做了些什么?优化了spark的哪些环节?

    在“漫谈”的过程中,笔者会结合源码,针对笔者认为有必要说明的一些问题做细节分析。

    1 Spark内存管理都做了些啥?

    我们知道JVM有自己的内存模型和内存分配回收机制,它会负责与操作系统交互进行内存的申请和释放等。那么,Spark内存管理又做了什么呢?笔者觉得它主要做了三件事:

    1. 在JVM之上搭建了一套逻辑上的内存管理机制,在spark的存储和执行框架使用JVM堆内存之前确保有足够内存空间。当内存空间不足时,spark memory manager的各个调用模块会采取相应的措施,比如ExternalSorter会在内存中不足时将数据spill到disk上。

    2. Tungsten构建了一套类似操作系统内存页管理的机,用MemoryBlock表示一个内存页,用自己的page table进行管理,实现了类似操作系统中的虚拟内存逻辑地址,对(pageNumber, offsetInPage)进行编码生成逻辑地址,统一了on heap和off heap内存的访问方式。

    3. Tungsten在off heap模式下会绕过JVM使用sun.misc.Unsafe的API直接与操作系统交互,进行内存的申请和释放,从而免除了创建JVM对象带来的额外内存开销以及GC对性能的影响。

    1.1 Memory Manager

    上面#1中的事情主要由MemoryManager (StaticMemoryManager和UnifiedMemoryManager)负责,它会利用不同的MemoryPool将内存按功能和性质区分开来,包括堆内存储内存池,堆外存储内存池,堆内执行内存池,堆外执行内存池:

    4 memory pools in MemoryManager

    memoryPool记录了内存使用状态的各项metrics,比如最大内存,可用内存,已用内存等。

    MemoryManager提供了几个方法供调用者使用以申请和释放指定类型的内存空间:

    methods for acquiring and releasing memory in MemoryManager

    unroll memory是什么?

    这里重点讲一下unroll memory的概念,在《Spark SQL内核剖析》上看到对"unroll"的定义:“将partition由不连续的存储空间转换为连续的存储空间的过程”。

    为了说明这个问题,我们先来看看acquireUnrollMemory方法的一个调用全过程:

    ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> BlockManager.getOrElseUpdate -> BlockManager.doPutIterator -> MemoryStore.putIteratorAsBytes -> MemoryStore.putIterator -> MemoryStore.reserveUnrollMemoryForThisTask -> MemoryManager.acquireUnrollMemory

    可以看到,task(shuffle map task和result task)执行时调用RDD.iterator获取指定partition的数据迭代器,这个过程中的MemoryStore.putIterator会遍历指定partition的所有records,获取每个value并将其存放在连续内存中:

    MemoryStore.putIterator

    因为是用迭代器一条一条record获取的,事先并不知道是否有足够内存存放下partition的所有数据,所以这里的步骤是这样的:

    1. 先向memoryManager申请一份unroll内存(初始大小由参数spark.storage.unrollMemoryThreshold控制,默认为1mb);

    2. 然后每读一条record都会评估一下当前所需内存是否超过已分配内存,如果超过,则向memoryManager申请额外需要的内存。如果申请成功,则继续读取下一个record,否则就停止unroll,即存储partition到内存失败。

    loop of reading record in MemoryStore.putIterator

    3. 重复步骤#2,直到partition所有数据都成功unroll,或因内存不足而停止unroll.

    4. 如果partition所有数据都成功unroll,则将unroll memory转化成storage memory : 

    transfer unroll memory to storage memory in MemoryStore.putIterator

    可以看到,最终会release unroll memory并申请storage memory. 我们看一下UnifiedMemoryManager中acquireUnrollMemory和MemoryManager中releaseUnrollMemory的实现:

    UnifiedMemoryManager.acquireUnrollMemory MemoryManager.releaseUnrollMemory

    可以看到,其实unroll memory和storage memory的申请及释放调用的是同样的方法。

    笔者对unroll memory的理解是:unroll memory和storage memory本质上是同一份内存,只是在任务执行的不同阶段的不同逻辑表述形式。在partition数据的读取存储过程中,这份内存叫做unroll memory,而当成功读取存储了所有reocrd到内存中后,这份内存就改了个名字叫storage memory了。

    注意,unroll memory的概念只存在于spark的存储模块中,在执行模块中是不存在unroll memory的。

    不知不觉已经写了不少字,今天先谈到这,未完待续。

    说明

    1. 本文内容及源码均基于spark 2.4.0之前版本

    2. 水平有限,有误之处望读者指出

    相关文章

      网友评论

        本文标题:漫谈Spark内存管理 (一)

        本文链接:https://www.haomeiwen.com/subject/cdgvkctx.html