漫谈Spark内存管理(二)

作者: 旺旺鸽不鸽 | 来源:发表于2019-07-23 17:56 被阅读52次

    漫谈Spark内存管理(一)中,概述了Spark内存管理做的事情,并着重对unroll memory的概念做了解释及分析。本文继续讨论Spark Memory Manager的功能实现.

    Spark的MemoryManager提供了一套逻辑上的内存申请和释放机制。spark1.6之后,UnifiedMemoryManager成为默认内存管理器,所以笔者以UnifiedMemoryManger为例分析spark内存管理器的具体实现。

    1 存储内存管理

    1.1 申请存储内存

    Spark中的RDD Block,Broadcast Block都可能使用存储内存(也可能用磁盘)进行存储,存储之前必须先向MemoryManager申请所需要的内存空间。

    UnifiedMemoryManger.acquireStorageMemory方法用于为block申请指定memory mode(onHeap或offHeap),指定memory size的存储内存空间:

    UnifiedMemoryManger.acquireStorageMemory

    1. 首先,根据申请的memorymode获取对应的执行内存池,存储内存池和最大存储内存(maxMemory);

    2. 最大存储内存由UnifiedMemoryManager的两个方法提供:

    In UnifiedMemoryManager

    UnifiedMemoryManager的执行和存储内存是可以互借的,所以这里获取最大存储内存时,直接用最大内存减去已用执行内存。

    3. 看看maxHeapMemory和maxOffHeapMemory是怎么得到的:

        a. maxHeapMemory由UnifiedMemoryManager.getMaxMemory方法计算得到:

    UnifiedMemoryManager.getMaxMemory

        首先,获取系统内存大小,默认直接调用 java 的Runtime.getRuntime.maxMemory 方法获取当前 jvm 最大内存,这个最大内存的值可通过 jvm 参数-Xmx 配置,但是这个值并不等于-Xmx 指定的值,会稍小一些,不同 jvm 和操作系统可能不同。-Xmx 的值可由--driver-memory和--executor-memory 控制;

        然后, 预留一部分系统内存,默认的RESERVED_SYSTEM_MEMORY_BYTES 为300MB, 也就是说默认预留 450MB 系统内存。可用内存就是系统内存减去预留内存。

        最后,UnifiedMemoryManager 的 maxHeapMemory 就是可用内存乘以spark.memory.fraction.

        b. maxOffHeapMemory直接从配置中读取:

    maxOffHeapMemory in MemoryManager

    4. 当申请的内存比存储内存池的空闲内存大,则向执行内存池借内存,并调整执行内存池和存储内存池的_poolSize.

    5. 最后调用存储内存池的 acquireMemory 方法申请内存。

    6. 再看看 storagePool.acquireMemory 的实现:

    这里的参数numBytesToFree就是要申请的内存大小减去存储内存池的空闲内存大小:

    numBytesToFree

    如果存储内存池的当前空闲内存不够,则调用MemoryStore.evictBlocksToFreeSpace方法释放内存。该方法会遍历memoryStore中存储的所有blocks,选择出可驱逐的blocks,可驱逐block需要满足条件:

        a. 使用的memorymode与要申请的memorymode相同

        b. 与申请内存的block不属于同一个rdd

        c. 没有正在被读取(not locked for reading)

        只有当可释放的内存总量大于需要释放的内存量时才会调用blockEvictionHandler.dropFromMemory从内存中删除选中的blocks. 需注意,因为evictBlocksToFreeSpace方法会调用memoryManager.synchronized,所以,同一时刻最多只有一个task在删除memoryStore中的block.

    MemoryStore.evictBlocksToFreeSpace

        释放完内存后,如果空闲内存足够,则更新存储内存池的_memoryUsed变量。

    从上面的分析可以看出,acquireStorageMemory会判断当前空闲存储内存是否足够,如果不够则会从执行内存池借空闲内容,如果还不够,则会驱逐当前内存池中的某些blocks以释放内存。如果这两种措施都无法获取足够的空闲存储内存,则申请存储内存失败,acquireStorageMemory返回false. 如果申请成功,则更新存储内存池的_memoryUsed变量,表示这部分内存已被使用。

    1.2 释放存储内存

    UnifiedMemoryManger.releaseStorageMemory方法用于释放存储内存:

    UnifiedMemoryManger.releaseStorageMemory

    同样也是根据memory mode调用不同存储内存池的releaseMemory方法。onHeapStoreageMemoryPool和offHeapStorageMemoryPool都是StorageMemoryPool类的对象,下面看看StorageMemoryPool.releaseMemory方法的实现:

    StorageMemoryPool.releaseMemory

    很简单,就是更新存储内存池的_memoryUsed变量。

    2 执行内存管理

    2.1 申请执行内存

    Spark中的shuffle,aggregate,join等操作都会使用执行内存,每个task在执行时会通过taskMemoryManager调用MemoryManager的acquireExecutionMemory方法申请需要的执行内存。UnifiedMemoryManger.acquireExecutionMemory方法做了以下步骤:

        1. 根据memory mode获取对应的执行内存池,存储内存池, 用于存储的内存大小, 最大内存:

    UnifiedMemoryManger. acquireExecutionMemory

          其中maxHeapMemory,maxOffHeapMemory和上文介绍的是一样的,这点也体现了UnifiedMemoryManager的unified,哈哈。storageRegionSize是用于数据存储的内存大小,对于onHeap内存,storageRegionSize为:   

    onHeapStorageRegionSize

    其中的maxMemory就是上文中介绍的UnifiedMemoryManager.getMaxMemory方法返回的值。对于offHeap内存, storageRegionSize为:

    offHeapStorageRegionSize

    这里的maxOffHeapMemory和上文提到的也是同一个。

        2. 定义用于增长执行内存池的maybeGrowExecutionPool方法,以及用于计算最大执行内存池大小的computeMaxExecutionPoolSize方法:

    UnifiedMemoryManager.maybeGrowExecutionPool

        maybeGrowExecutionPool方法会做:

            a. 计算存储内存池的实际poolsize和用于数据存储的内存大小storageRegionSize之间的差值。因为在存储block(比如RDD block,broadcastblock)时,存储内存池可能从执行内存池借内存,所以它的实际poolsize可能大于配置的storageRegionSize.我们暂且称这个差值为delta.

            b. 比较存储内存池的空闲内存和delta,取更大的作为memoryReclaimableFromStorage.也就是说这里不仅会收回存储内存池从执行内存池借的内存,还会向存储内存池借空闲内存,即“回收+借”。

            c. 调用StorageMemoryPool.freeSpaceToShrinkPool方法,该方法会先从存储内存池的空闲内存中获取需要reclaim的内存,如果不够则会调用MemoryStore.evictBlocksToFreeSpace方法驱逐存储在内存中的某些block以释放内存。

            d. 最后,调整存储内存池和执行内存池的大小。

        computeMaxExecutionPoolSize方法用于计算调用maybeGrowPool之后,执行内存池的最大size,实现比较简单:

    UnifiedMemoryManager.computeMaxExecutionPoolSize

        注意,如果storagePool.memoryUsed小于storageRegionSize,则说明在maybeGrowPool中调用freeSpaceToShrinkPool方法时未能成功释放delta大小的内存。此时,computeMaxExecutionPoolSize方法返回的值会大于执行内存池的实际poolsize。

        3. 调用ExecutionMemoryPool.acquireMemory方法申请执行内存。

    再来看看ExecutionMemoryPool.acquireMemory的实现:

    ExecutionMemoryPool.acquireMemory

    ExecutionMemoryPool.acquireMemory用了一个while循环不断尝试分配内存,只有分配成功的情况下才会退出循环。每次尝试会做:

    1. 尝试从存储内存池回收内存,从而增长执行内存池的大小;

    2. 获取执行内存池的最大容量maxPoolSize,累计分配给单个task的内存大小范围为[maxPoolSize/2*numActiveTasks,maxPoolSize/(numActiveTasks)];

    3. 根据当前task已占用内存,申请的内存大小,可分配的内存大小范围,以及执行内存池的空闲内存大小,最终确定要分配的内存大小toGrant;

    4. 如果分配toGrant内存之后task所占内存仍小于maxPoolSize/2*numActiveTasks,则调用lock.wait()等待其他task释放内存;

    5. 如果toGrant在正常范围内,则更新指定task的已占用内存,并结束循环。

    2.2 释放执行内存

    ExecutionMemoryPool.releaseMemory

    可以看到,释放执行内存最终是通过更新memoryForTask这个map来实现的。最后通过执行内存池的lock通知所有在请求执行内存时由于内存不足调用lock.wait()等待的任务线程。执行内存池的已用内存大小是从memoryForTask计算得到的:

    ExecutionMemoryPool.memoryUsed

    3 分析&总结

    3.1 存储内存和执行内存申请过程的不同

    基于上文中的源码分析,比较申请存储内存和执行内存的过程会发现,在申请执行内存时,spark可能会驱逐存储内存中的block以满足执行内存的需要;而申请存储内存时,只会从执行内存池借空闲内存(而且借的有可能包括执行内存向存储内存借的,所以也应该是 “回收+借”),并不会释放执行内存以满足存储内存的需要。也就是说,在Spark中,执行内存的优先级是更高的。笔者认为,这是因为执行内存用于shuffle,aggregation,join等操作中的各种map等数据结构,强行释放这些内存可能会导致task运行错误或失败,而存储内存主要用于存放缓存的RDD Block,Broadcast Block等数据,spark可以将这些block从内存移到磁盘存储或直接删除,在需要访问时可以根据lineage重新计算。

    3.2 Spark内存管理器的功能

    通过上文对spark memory manager各个方法的源码分析,可以看到spark的内存管理器自建了一套控制内存使用的方案,但这是一套逻辑上的内存管理方案。从实现角度上讲,就是维护了一系列的变量来记录和控制spark各个模块对内存(包括onHeap和offHeap内存)的使用。而真正向操作系统申请和释放物理内存的工作由JVM或Tungsten完成,Tungsten内存管理的核心内容是在TaskMemoryManager类中实现的,后续文章我们会详细讨论。

    3.3 总结

    本文以UnifiedMemoryManager为例,从源码角度分析了spark内存管理器如何将内存划分为存储和执行内存,详述了存储和执行内存的申请和释放过程。分析了存储和执行内存申请过程中的不同之处,总结了spark内存管理器的功能。

    4 说明

        a. 本文spark源码版本为spark 2.4.0

        b. 水平有限,如有错误,望读者指出

    相关文章

      网友评论

        本文标题:漫谈Spark内存管理(二)

        本文链接:https://www.haomeiwen.com/subject/nrdalctx.html