美文网首页Spark源码精读分析计划
Spark Core源码精读计划#23:与存储相关的内存池及内存

Spark Core源码精读计划#23:与存储相关的内存池及内存

作者: LittleMagic | 来源:发表于2019-07-04 22:25 被阅读15次

    目录

    前言

    我们用两篇文章的时间搞清楚了Spark存储中的“块”到底是怎么一回事,接下来我们就可以放心来看Spark Core存储子系统的细节了。前面已经提到过,Spark会同时利用内存和外存,尤其是积极地利用内存作为存储媒介。这点与传统分布式计算框架(如Hadoop MapReduce)的“内存仅用于计算,外存仅用于存储”的方式是非常不同的,同时也是Spark高效设计哲学的体现。接下来一段时间内,我们先研究Spark存储中的内存部分,再研究磁盘(外存)部分。

    虽然BlockManager是Spark存储子系统的司令官,但它并不会直接管理块,而会将对内存和外存的管理分别组织起来。与内存存储相关的组件包括内存池MemoryPool、内存管理器MemoryManager、内存存储器MemoryStore。本文先来探索内存池和内存管理器的大体实现。

    内存池MemoryPool

    MemoryPool抽象类从逻辑上非常松散地定义了Spark内存池的一些基本约定,其完整源码如下。

    代码#23.1 - o.a.s.memory.MemoryPool抽象类

    private[memory] abstract class MemoryPool(lock: Object) {
      @GuardedBy("lock")
      private[this] var _poolSize: Long = 0
    
      final def poolSize: Long = lock.synchronized {
        _poolSize
      }
    
      final def memoryFree: Long = lock.synchronized {
        _poolSize - memoryUsed
      }
    
      final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
        require(delta >= 0)
        _poolSize += delta
      }
    
      final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
        require(delta >= 0)
        require(delta <= _poolSize)
        require(_poolSize - delta >= memoryUsed)
        _poolSize -= delta
      }
    
      def memoryUsed: Long
    }
    

    在构造MemoryPool时,需要传入一个锁对象lock用于线程同步,该lock实际上就是后面会讲到的内存管理器MemoryManager。MemoryPool中定义了以下方法。

    • poolSize:获得内存池的大小,单位为字节。
    • memoryUsed:获得内存池中已占用内存的大小。该方法未提供具体实现,需要子类实现。
    • memoryFree:获得内存池中空闲内存的大小,就是上述poolSize减去memoryUsed。
    • incrementPoolSize():扩展内存池delta个字节的大小。该方法不能被覆写。
    • decrementPoolSize():压缩内存池delta个字节的大小。注意已占用的内存不能被压缩掉,并且该方法也不能被覆写。

    以上所有方法(以及其实现类的大部分方法)都由MemoryManager保证线程安全性,防止多线程同时操作内存池,造成分配混乱。

    MemoryPool有两个实现类:StorageMemoryPool与ExecutionMemoryPool。顾名思义,StorageMemoryPool用于存储,比如RDD数据、广播变量数据的缓存与分发;ExecutionMemoryPool用于执行,这包含Spark的计算(连接、聚合、排序等等)和Shuffle过程。ExecutionMemoryPool严格上来讲不属于存储子系统的组成部分,因此本文先来看StorageMemoryPool。

    存储内存池StorageMemoryPool

    构造与属性成员

    代码#23.2 - o.a.s.memory.StorageMemoryPool类的构造与属性成员

    private[memory] class StorageMemoryPool(
        lock: Object,
        memoryMode: MemoryMode
      ) extends MemoryPool(lock) with Logging {
    
      private[this] val poolName: String = memoryMode match {
        case MemoryMode.ON_HEAP => "on-heap storage"
        case MemoryMode.OFF_HEAP => "off-heap storage"
      }
    
      @GuardedBy("lock")
      private[this] var _memoryUsed: Long = 0L
      override def memoryUsed: Long = lock.synchronized {
        _memoryUsed
      }
    
      private var _memoryStore: MemoryStore = _
      def memoryStore: MemoryStore = {
        if (_memoryStore == null) {
          throw new IllegalStateException("memory store not initialized yet")
        }
        _memoryStore
      }
    
      // ......
    }
    

    StorageMemoryPool的构造方法参数除了锁对象之外,还有一个MemoryMode,它表示该内存池会使用哪部分的内存,其定义如下。

    代码#23.3 - o.a.s.memory.MemoryMode枚举

    @Private
    public enum MemoryMode {
      ON_HEAP,
      OFF_HEAP
    }
    

    其中,ON_HEAP表示使用堆内内存,即每个Executor JVM使用的那部分内存;OFF_HEAP表示使用堆外内存,即Worker节点上的本机内存(native memory),需要通过Unsafe API(讲解的文章见这里)来分配。由于

    Spark堆内内存和堆外内存的关系如下面的简图所示。

    图#23.1 - Spark堆内内存与堆外内存的关系

    根据MemoryMode的不同,使用堆内内存时池子的名称为on-heap storage,使用堆外内存时池子的名称为off-heap storage

    StorageMemoryPool使用私有变量_memoryUsed来记录使用了多少内存,并覆写memoryUsed这个Getter方法来返回之。另外,它还必须有与其关联的MemoryStore实例。MemoryStore真正地负责块在内存中的存取,下一篇文章就会讲解到它。下面来看StorageMemoryPool提供的方法。

    申请内存

    申请内存的逻辑由acquireMemory()方法来实现。

    代码#23.4 - o.a.s.memory.StorageMemoryPool.acquireMemory()方法

      def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
        val numBytesToFree = math.max(0, numBytes - memoryFree)
        acquireMemory(blockId, numBytes, numBytesToFree)
      }
    
      def acquireMemory(
          blockId: BlockId,
          numBytesToAcquire: Long,
          numBytesToFree: Long): Boolean = lock.synchronized {
        assert(numBytesToAcquire >= 0)
        assert(numBytesToFree >= 0)
        assert(memoryUsed <= poolSize)
        if (numBytesToFree > 0) {
          memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
        }
    
        val enoughMemory = numBytesToAcquire <= memoryFree
        if (enoughMemory) {
          _memoryUsed += numBytesToAcquire
        }
        enoughMemory
      }
    

    这两个方法的执行流程是:ID为blockId的块需要申请numBytesToAcquire字节的内存,首先检查当前的剩余内存量memoryFree是否能满足分配,如果不能,就调用MemoryStore.evictBlocksToFreeSpace()方法,释放出(numBytes - memoryFree)大小的内存。然后再次检查剩余内存量是否能满足分配,如果够,就将占用内存量增加。最终返回是否分配成功。

    可见,acquireMemory()虽然名义上为申请内存,但实际上没有什么真正的内存分配操作,更多的是检查与记录而已。这是因为在Java/Scala环境中,对象内存的实际分配和释放一般都是由JVM来管理的,Spark只需要跟踪好就可以了。这也符合其父类MemoryPool类注释中的描述:“Manages bookkeeping for an adjustable-sized region of memory”,其中bookkeeping一词的含义即为“记账”,ExecutionMemoryPool也是同理。至于堆外内存的实际分配,是由基于Unsafe API的MemoryAllocator/MemoryConsumer组件来实现,这就是后话了。

    释放内存

    释放内存的逻辑则由releaseMemory()和releaseAllMemory()方法来实现。

    代码#23.5 - o.a.s.memory.StorageMemoryPool.releaseMemory()/releaseAllMemory()方法

      def releaseMemory(size: Long): Unit = lock.synchronized {
        if (size > _memoryUsed) {
          logWarning(s"Attempted to release $size bytes of storage " +
            s"memory when we only have ${_memoryUsed} bytes")
          _memoryUsed = 0
        } else {
          _memoryUsed -= size
        }
      }
    
      def releaseAllMemory(): Unit = lock.synchronized {
        _memoryUsed = 0
      }
    

    这个实现非常简单,就是将memoryUsed减去要释放的量,或者直接设为0。

    下面再来看一看内存管理器MemoryManager的部分细节,它直接管理着MemoryPool,是Spark作业运行时内存管理的统一入口。

    内存管理器MemoryManager

    MemoryManager与MemoryPool一样,也是一个抽象类。Spark环境中的每个JVM实例都会持有一个MemoryManager,先来看它的属性成员和构造方法。

    构造与属性成员

    代码#23.6 - o.a.s.memory.MemoryManager抽象类的属性成员和构造方法

    private[spark] abstract class MemoryManager(
        conf: SparkConf,
        numCores: Int,
        onHeapStorageMemory: Long,
        onHeapExecutionMemory: Long) extends Logging {
    
      @GuardedBy("this")
      protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
      @GuardedBy("this")
      protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
      @GuardedBy("this")
      protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
      @GuardedBy("this")
      protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
    
      protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
      protected[this] val offHeapStorageMemory =
        (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
    
      onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
      onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
    
      offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
      offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
    
      // ......
    }
    

    MemoryManager需要4个构造方法参数:

    • conf:即SparkConf;
    • numCores:分配的CPU核心数;
    • onHeapStorageMemory:用于存储的堆内内存的大小(字节);
    • onHeapExecutionMemory:用于执行的堆内内存的大小(字节)。

    MemoryManager初始化了4个内存池,分别是堆内、堆外的存储内存池,以及堆内、堆外的执行内存池。另外,堆外内存的最大值可以由配置项spark.memory.offHeap.size来指定,默认为0。堆外存储内存占总堆外内存的比例则由配置项spark.memory.storageFraction指定,默认0.5,即50%,剩下的就是堆外执行内存。这个参数在后面还会出现。

    接下来对4个内存池分别调用其incrementPoolSize()方法,设定合适的容量,初始化完毕。

    内存管理方法

    MemoryManager中给出了一批内存管理方法的定义,这其中有些是抽象方法,需要其子类去实现。这些方法的清单如下。

    代码#23.7 - o.a.s.memory.MemoryManager定义的内存管理方法

      def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
    
      def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
    
      private[memory]
      def acquireExecutionMemory(
          numBytes: Long,
          taskAttemptId: Long,
          memoryMode: MemoryMode): Long
    
      private[memory]
      def releaseExecutionMemory(
          numBytes: Long,
          taskAttemptId: Long,
          memoryMode: MemoryMode): Unit = synchronized {
        memoryMode match {
          case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
          case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
        }
      }
    
      private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
        onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
          offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
      }
    
      def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
        memoryMode match {
          case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
          case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
        }
      }
    
      final def releaseAllStorageMemory(): Unit = synchronized {
        onHeapStorageMemoryPool.releaseAllMemory()
        offHeapStorageMemoryPool.releaseAllMemory()
      }
    
      final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
        releaseStorageMemory(numBytes, memoryMode)
      }
    

    可见,acquireStorageMemory()、acquireUnrollMemory()和acquireExecutionMemory()三个用于申请内存的方法都需要子类去实现。什么是Unroll内存呢?RDD在被缓存之前,它所占用的内存空间是不连续的,而被缓存到存储内存之后,就以块的形式来存储,占用连续的内存空间了。Unroll就是这个将RDD固化在连续内存空间的过程,中文一般翻译为“展开”。Unroll过程使用的内存空间就是展开内存,它本质上是存储内存中比较特殊的一部分。

    各个释放内存的方法则基本上代理了MemoryPool对应的释放方法,比较容易理解。

    除此之外,MemoryManager类还提供了Tungsten机制下的一些内存管理相关的属性。现在铺开讲它还为时过早,看官目前只需知道Tungsten是DataBricks在3~4年前提出的Spark优化方案即可,前文提到的堆外内存管理即属于Tungsten机制的一部分。早在这个系列开始之前,我曾经写过一篇关于Tungsten Sort Shuffle的简单解析,可以参考这里

    总结

    本文通过引入对内存池MemoryPool的介绍,搞清楚了用于存储的内存池StorageMemoryPool的基本逻辑,另外还对内存及MemoryPool的管理器——MemoryManager进行了简要的分析。由于Spark内存管理这一块知识的交叉性比较强,代码不能拆分得很开,所以难免会出现一些未接触过的概念,在之后的源码阅读过程中自然会逐渐了解。

    在下一篇文章中,我们会重点解析MemoryManager的两种实现,即静态内存管理器StaticMemoryManager、统一内存管理器UnifiedMemoryManager,从而深刻理解Spark Core的内存管理模型。它也是Spark作业内存调优的基础。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#23:与存储相关的内存池及内存

        本文链接:https://www.haomeiwen.com/subject/vqydhctx.html