1. 内存的分配
如果我们选择了这种UnifiedMemoryManager来管理一个1G内存的Executor, 那么实际上每个部分的内存是多少呢? 下面这个图是1.5的, 1.6对比例进行了微调
已经看过的内存管理图
- 系统会预留300MB内存
- 在剩余的700MB内存中, 有75%会被分配到storage区域, 用于存储和计算. 这一部分是525MB, 这个比例可以使用
spark.memory.fraction
来调整. 另外一种static memory manager的参数和这边不同, 要注意内存管理模型和参数是配套的 - 剩下的部分就是Other区域, 用于进行spark自身, 以及broadcast的一些数据的使用
2. 存储和计算内存的动态分配
内存这张图概括了统一内存管理模型的基本约束
- Storage部分抢占到Executor的空间随时可能被释放
- Executor抢占到的Storage部分不会被主动释放, 只能等待使用结束
- 当有剩余空间时, 大家相安无事, 都没有空间了, 只好把数据向磁盘写入, 而不是去占用预留空间. 预留空间主要为了应对spark自身内存不足的场景
3. 内部对象和方法
申请执行用内存
/**
为当前的Task申请numBytes数量的内存, 返回0代表分配失败
这个请求是可能阻塞的, 直到找到足够的内存页
每个人物最多可以从内存池中分配到 1/2N , N是当前活跃的任务数
如果之前有任务已经获得了大量的内存, 那么就只好把数据刷到磁盘
*/
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assertInvariant()
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
/**
通过释放掉storage抢占到的用于cache的内存, 将Executor内存池容量扩大
当为一个任务申请内存时, 这里需要做大量的尝试性分配
因为每次尝试申请, 都可能在释放掉内存后,
接着被storage用来缓存一个大对象
*/
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// 当前内存池空了, 尝试去抢占storage部分, free的部分可以直接抢占
// 如果当前状态时storage抢占了executor的内存, 则立即释放它抢占的部分
val memoryReclaimableFromStorage =
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// 申请满足需求的内存
val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storageMemoryPool.decrementPoolSize(spaceToReclaim)
onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
}
}
}
/**
计算当前能用到的最大内存, 很容易理解
如果storage没有满, 那么就最大内存是尽可能多的抢占
如果storage已经满了, 按照默认配置, executor可以拿到50%的内存空间
*/
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storageMemoryUsed, storageRegionSize)
}
onHeapExecutionMemoryPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
case MemoryMode.OFF_HEAP =>
// off-heap是由tachyon管理的, 逻辑在独立的项目里
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
申请存储用内存
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assertInvariant()
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
// 这里会报一个错误, fast-fail机制
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
return false
}
if (numBytes > storageMemoryPool.memoryFree) {
// 当内存不足时, 尝试抢占executor的空闲内存, 并把对应的blocck保存到evctedBlocks里去, 为Executor可能的索取做准备
val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
网友评论