美文网首页
6.2 MemoryManager 之UnifiedMemory

6.2 MemoryManager 之UnifiedMemory

作者: GongMeng | 来源:发表于2018-11-25 22:56 被阅读0次

1. 概述

UnifiedMemoryManager 是spark默认的一个内存管理器的具体实现, 它使用的是on-heap的纯内存模式.
这个具体实现允许存储和计算之间根据需求相互补充内存的使用, 但是整体上保持同一个大体的比例. 默认在扣除预留空间后有75%的内存空间交付给MemoryManager处理, 然后存储和计算时五五开. 虽然相互之间可以借内存, 但计算部分借的内存不会即刻归还, 而存储部分使用的则是可以通过即刻的回收方法进行回收和使用的.

源码中注释的解释如下

/**
 * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
 * either side can borrow memory from the other.
 *
 * The region shared between execution and storage is a fraction of (the total heap space - 300MB)
 * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
 * within this space is further determined by `spark.memory.storageFraction` (default 0.5).
 * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
 *
 * Storage can borrow as much execution memory as is free until execution reclaims its space.
 * When this happens, cached blocks will be evicted from memory until sufficient borrowed
 * memory is released to satisfy the execution memory request.
 *
 * Similarly, execution can borrow as much storage memory as is free. However, execution
 * memory is *never* evicted by storage due to the complexities involved in implementing this.
 * The implication is that attempts to cache blocks may fail if execution has already eaten
 * up most of the storage space, in which case the new blocks will be evicted immediately
 * according to their respective storage levels.
 *
 * @param storageRegionSize Size of the storage region, in bytes.
 *                          This region is not statically reserved; execution can borrow from
 *                          it if necessary. Cached blocks can be evicted only if actual
 *                          storage memory usage exceeds this region.
 */

内存计算方法

object UnifiedMemoryManager {

  // Set aside a fixed amount of memory for non-storage, non-execution purposes.
  // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
  // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
  // the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default.
  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

2. 内部方法

2.1 获得存储用的内存

这是一个需要同步的方法, 为了保证不会OOM, 对内存的余量的计算都是同步的.

  override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
    assertInvariant()
    assert(numBytes >= 0)
    if (numBytes > maxStorageMemory) {
      // Fail fast if the block simply won't fit
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxStorageMemory bytes)")
      return false
    }
    if (numBytes > storageMemoryPool.memoryFree) {
      // There is not enough free memory in the storage pool, so try to borrow free memory from
      // the execution pool.
      val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
      onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
      storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
    }
    storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
  }

2.2 acquireExecutorMemory

扩大运行时内存, 这里要注意的是, 由于storage memory有好几种不同的形态.

/**
   * Try to acquire up to `numBytes` of execution memory for the current task and return the
   * number of bytes obtained, or 0 if none can be allocated.
   *
   * This call may block until there is enough free memory in some situations, to make sure each
   * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
   * active tasks) before it is forced to spill. This can happen if the number of tasks increase
   * but an older task had a lot of memory already.
   */
  override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    assertInvariant()
    assert(numBytes >= 0)
    memoryMode match {
      case MemoryMode.ON_HEAP =>

        /**
         * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
         *
         * When acquiring memory for a task, the execution pool may need to make multiple
         * attempts. Each attempt must be able to evict storage in case another task jumps in
         * and caches a large block between the attempts. This is called once per attempt.
         */
        def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
          if (extraMemoryNeeded > 0) {
            // There is not enough free memory in the execution pool, so try to reclaim memory from
            // storage. We can reclaim any free memory from the storage pool. If the storage pool
            // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
            // the memory that storage has borrowed from execution.
            val memoryReclaimableFromStorage =
              math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
            if (memoryReclaimableFromStorage > 0) {
              // Only reclaim as much space as is necessary and available:
              val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
                math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
              storageMemoryPool.decrementPoolSize(spaceToReclaim)
              onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
            }
          }
        }

        /**
         * The size the execution pool would have after evicting storage memory.
         *
         * The execution memory pool divides this quantity among the active tasks evenly to cap
         * the execution memory allocation for each task. It is important to keep this greater
         * than the execution pool size, which doesn't take into account potential memory that
         * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
         *
         * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
         * in execution memory allocation across tasks, Otherwise, a task may occupy more than
         * its fair share of execution memory, mistakenly thinking that other tasks can acquire
         * the portion of storage memory that cannot be evicted.
         */
        def computeMaxExecutionPoolSize(): Long = {
          maxMemory - math.min(storageMemoryUsed, storageRegionSize)
        }

        onHeapExecutionMemoryPool.acquireMemory(
          numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)

      case MemoryMode.OFF_HEAP =>
        // For now, we only support on-heap caching of data, so we do not need to interact with
        // the storage pool when allocating off-heap memory. This will change in the future, though.
        offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
    }
  }

相关文章

网友评论

      本文标题:6.2 MemoryManager 之UnifiedMemory

      本文链接:https://www.haomeiwen.com/subject/llqmqqtx.html