1 内存模式
Spark将内存分为堆内和堆外内存,称为内存模式-MemoryMode,枚举类型Memory定义了具体使用 到的内存模型
@Private
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
}
并且这里的堆内存并不能直接和Java的堆内存划等号,因为涉及到一块固定300MB的用户内存,所以只是堆内存的一部分,而堆外内存使用sun.misc.Unsafe的Api直接在工作节点的系统内存开辟空间。Spark中,无论哪种内存类型,都使用了内存池对其进行管理。
2 内存池-MemoryPool
接下来介绍的部分都位于spark-core模块的org.apache.spark.memory包内,MemoryPool抽象类定义了内存池的基本属性和方法。
在spark中,对内存池的操作都使用到了锁,所以构造方法就是传入一个锁对象。
private[memory] abstract class MemoryPool(lock: Object) {
@GuardedBy("lock") //表示内存池大小
private[this] var _poolSize: Long = 0
final def poolSize: Long = lock.synchronized { _poolSize }
//表示内存池已经使用的大小
def memoryUsed: Long
//获取内存池的空闲大小
final def memoryFree: Long = lock.synchronized { _poolSize - memoryUsed }
//为当前内存池扩充指定大小
final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
_poolSize += delta
}
//缩小内存池,已经被使用的内存不能被移除
final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
require(delta <= _poolSize)
require(_poolSize - delta >= memoryUsed)
_poolSize -= delta
} }
执行内存池和存储内存池都继承了该类。
3 执行内存池-ExecutionMemoryPool
执行内存池属于计算引擎的一部分,它的内存只会分配给Task进行使用,使用了一个HashMap来记录内存的使用情况。
#taskAttemptId -> memory
private val memoryForTask = new mutable.HashMap[Long, Long]()
对于执行内存总体使用情况,或者是单个Task的内存使用情况,都可以直接访问该Map得到。
3.1 acquireMemory 申请内存
用于给指定的TaskAttemptId进行指定大小numBytes的内存申请,需要传入两个匿名函数,maybeGrowPool表示如果执行内存不足,该如何处理;computeMaxPoolSize表示最大可以申请的内存。

如果该task之前没有进行过内存申请,则将其加入memoryForTask,内存大小为0,并且唤醒所有等待申请内存的线程。
之后需要不断循环以下操作,直至申请到内存:
- 获取当前task的数量(numActiveTasks),以及当前待申请内存Task的已有内存(curMem);
- 执行maybeGrowPool,如果执行内存不足,在动态内存规划中,进行存储内存驱逐,执行内存侵占存储内存;
- 调用computeMaxPollSize,计算释放存储内存后,执行内存池可用的最大大小(maxPoolSize);
- 计算每个Task可以申请的最大内存:maxPoolSize / numActiveTasks,表示在侵占存储内存后总内存的1/n;
- 计算每个Task可以申请的最小内存:poolSize / (2 * numActiveTasks),表示不侵占存储内存的执行内存总大小的1/2n。
- 计算当前任务可以申请到最大的内存大小(maxToGrant):math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)),表示不超过numBytes,不超过每个tAsk可申请最大内存。保持在0 <= X <= 1 / numActiveTasks之间;
- 计算当前任务真正可以申请到的内存大小(toGrant):math.min(maxToGrant, memoryFree);
- 如果申请的内存小于待申请内存numBytes,且当前总的Task内存小于Task可以申请的最小内存(第5步),说明连Task执行的最基本内存要求都无法满足,则执行lock.wait进行线程等待,等待内存有释放再唤醒;否则,更新memoryForTask的当前Task内存大小为toGrant,并返回toGrant,退出循环。
上述计算中,如果分配Executor内存为10G,不考虑300MB用于预留内存,那么Storage+Execution=10G0.6=6G,分别为3G。如果没有任何block进行缓存,那么Storage为空,假如有4个并行Task,那么每个Task最大申请内存为:6G/4=1.5G,最小为3G/(24)=375MB。但实际上Storage内存不可能一点都不使用,因为unroll申请的内存也会从这里拿。所以每个Task最大1G是比较准确的。
3.2 releaseMemroy 释放内存
用于给指定Task释放指定大小numBytes的内存:

如果需要释放的内存比持有的多,那么报错;否则更新memoryForTask对该Task对应的内存大小,如果剩余内存为0,则将该Task移除。最后唤醒等待申请内存的其他线程,进行acquireMemory中的申请尝试。
4 存储内存池-StorageMemoryPool
存储内存池用于物理内存的逻辑抽象,通过逻辑管理,可以提高内存使用效率。
存储内存池使用了一个变量来记录当前Executor使用的存储内存大小。
@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L
4.1 acquireMemory 申请内存
用于给指定的BlockId对应的Block获取指定大小的内存,申请内存有两个重载方法:

如果存储内存池内存不足,那么需要释放掉其他Block占用的内存;否则不需要释放内存,直接申请即可。

如果numBytesToFree>0,即需要先释放内存,则调用memoryStore.evictBlocksToFreeSpace进行内存驱逐。然后判断驱逐内存后的free内存是否比待申请内存大,如果满足条件,则进行内存分配,否则不进行内存分配,返回false,告知失败。
4.2 releaseAllMemory 释放内存
执行内存池的释放非常简单,直接将_memoryUsed=0l即可。如果需要释放指定大小内存,那么只要减少一部分即可。
4.3 freeSpaceToShrinkPool 缩小内存池

如果需要缩小的部分大于free内存,那么不能缩小的部分,需要调用memoryStore.evictBlocksToFreeSpace进行内存驱逐,之后返回总的可以释放的内存。
网友评论