美文网首页
Spark源码[3]-MemoryManager模型

Spark源码[3]-MemoryManager模型

作者: 蠟筆小噺没有烦恼 | 来源:发表于2019-10-26 16:06 被阅读0次

每个Executor都会有一个MemoryManager进程,用于管理该Executor中的执行内存和存储内存分配,其代码位于spark-core模块下的
org.apache.spark.memory.MemroyManager,是一个抽象类。

1 MemoryManger种的属性

MemroyManager中会维护四种我们之前介绍过的线程池:


分别是堆内存储/执行内存池和堆外存储/执行内存池,并传入当前类作为lock,并为其配置存储/执行的大小。而堆外内存需要读取一个配置参数spark.memory.offHeap.size,根据其大小进行执行/存储内存的配置。

2 MemoryManager中的待实现方法

2.1 acquireExecutionMemory

  def acquireExecutionMemory(numBytes: Long,taskAttemptId: Long,memoryMode: MemoryMode): Long

在尝试任务执行时候,从堆内或者堆外进行Task执行内存的申请,所需大小为numBytes

2.2 acquireStorageMemory

acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

在尝试任务执行时候,为存储指定Block,从堆内或者堆外进行存储内存的申请,所需大小为numBytes

2.3 acquireUnrollMemory

def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

在尝试任务执行时候,从堆内或者堆外进行Task展平内存的申请,所需大小为numBytes。
RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问。通过 Iterator 可以获取分区中每一条序列化或者非序列化的数据(Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同 Record 的空间并不连续。

当调用RDD的persist相关方法中包含内存的存储方法时,RDD 在缓存到[存储内存]之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将Partition从other区域由不连续的存储空间转换为Storage区域连续存储空间的过程,Spark称之为"展开"(Unroll)。
Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例;序列化的 Block 则以 SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor 的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的 Block 对象的实例,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。

在展开过程中不能保证存储空间可以一次容纳 Iterator 中的所有数据,在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,如果申请的空间不足存放整个Iterator,则 Unroll 失败;空间足够时可以继续进行展开。对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。而非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放其他Block已占用的不需要的 Unroll 空间。如果最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间,如下图所示。


在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的;而统一内存管理时则没有对 Unroll 空间进行特别区分,使用的是Storage存储空间。当存储空间不足时会根据动态占用机制进行处理。

2.4 releaseExecutionMemory

def releaseExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Unit = synchronized {
  memoryMode match {
    case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
    case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
  }
}

释放Executor的内存。

2.5 releaseAllExecutionMemoryForTask

private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
  onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
    offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}

释放目标Task的所有内存。

3 MemoryManger的具体实现->UnifiedMemoryManager

截止到Spark2.4,已经将静态内存管理完全移除,上述的MemoryManager只有一个实现类,那就是UnifiedMemoryManager,动态内存管理。存储内存和执行内存之间有软边界,可以彼此借内存使用。

3.1 UnifiedMemoryManager包含的属性

maxHeapMemory:最大的堆内存为(ExecutorMemory-300MB)*0.6,该比例通过spark.memory.fraction进行设置
onHeapStorageRegionSize:存储内存的堆内存大小
numCores:CPU内核数

3.2 UnifiedMemoryManager实现的方法

3.2.1 maxOnHeapStorageMemory

返回用于存储的最大堆内存,由于存储内存和执行内存之间的软边界,所以在某一时间点可用的存储内存是堆内存-已使用的执行内存,也就是说相当于存储内存+执行内存未使用的。而最大执行内存计算方式也相同。


3.2.2 maxOffHeapStorageMemory

返回用于存储的最大堆外内存:


3.2.3 acquireExecutionMemory

申请执行内存最终会调用执行内存池种的方法:


先确认使用哪种内存模式,然后分别定义maybeGrowExecutionPool和computeMaxExecutionPoolSize两个方法:
maybeGrowExecutionPool表示如果传入的参数(当前申请不足的内存大小)大于0,则尝试进行对存储内存占用执行内存的部分进行驱逐,如果还不够,那么需要进一步驱逐存储内存的Block。
computeMaxExecutionPoolSize用于计算当前情况下,执行内存最多可以使用的内存。如果存储内存占用不足其配置大小,则将未占用部分也给执行内存使用。而如果占用的超过了配置大小,则将多余占用的部分会强制回收,也就是将执行内存配置大小作为可用大小。最终会调用执行内存池进行内存申请。

3.2.4 acquireStorageMemory

先确认使用哪种内存模式,如果内存不足,那么先尝试对执行内存的借用,然后调用存储内存池的方法进行内存申请,注意返回的是Boolean,是否申请成功,如果不够,那么直接返回false。

3.2.5 acquireUnrollMemory

调用的是acquireStorageMemory,也就明确了,展开Block操作其实占用的是存储内存。存储内存的用途就变成了:存储Block,进行Block展开。

相关文章

网友评论

      本文标题:Spark源码[3]-MemoryManager模型

      本文链接:https://www.haomeiwen.com/subject/xjfuvctx.html