美文网首页
9.2 spark内存管理之 UnifiedMemoryMana

9.2 spark内存管理之 UnifiedMemoryMana

作者: GongMeng | 来源:发表于2019-01-02 21:26 被阅读0次

    图片来源 https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html

    1. 内存的分配

    如果我们选择了这种UnifiedMemoryManager来管理一个1G内存的Executor, 那么实际上每个部分的内存是多少呢? 下面这个图是1.5的, 1.6对比例进行了微调


    已经看过的内存管理图
    1. 系统会预留300MB内存
    2. 在剩余的700MB内存中, 有75%会被分配到storage区域, 用于存储和计算. 这一部分是525MB, 这个比例可以使用spark.memory.fraction 来调整. 另外一种static memory manager的参数和这边不同, 要注意内存管理模型和参数是配套的
    3. 剩下的部分就是Other区域, 用于进行spark自身, 以及broadcast的一些数据的使用

    2. 存储和计算内存的动态分配

    内存

    这张图概括了统一内存管理模型的基本约束

    • Storage部分抢占到Executor的空间随时可能被释放
    • Executor抢占到的Storage部分不会被主动释放, 只能等待使用结束
    • 当有剩余空间时, 大家相安无事, 都没有空间了, 只好把数据向磁盘写入, 而不是去占用预留空间. 预留空间主要为了应对spark自身内存不足的场景

    3. 内部对象和方法

    申请执行用内存

     /**
        为当前的Task申请numBytes数量的内存, 返回0代表分配失败
       这个请求是可能阻塞的, 直到找到足够的内存页
       每个人物最多可以从内存池中分配到 1/2N , N是当前活跃的任务数
       如果之前有任务已经获得了大量的内存, 那么就只好把数据刷到磁盘
       */
      override private[memory] def acquireExecutionMemory(
          numBytes: Long,
          taskAttemptId: Long,
          memoryMode: MemoryMode): Long = synchronized {
        assertInvariant()
        assert(numBytes >= 0)
        memoryMode match {
          case MemoryMode.ON_HEAP =>
    
            /**
             通过释放掉storage抢占到的用于cache的内存, 将Executor内存池容量扩大
             
             当为一个任务申请内存时, 这里需要做大量的尝试性分配
             因为每次尝试申请, 都可能在释放掉内存后,
             接着被storage用来缓存一个大对象
            */
            def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
              if (extraMemoryNeeded > 0) {
                // 当前内存池空了, 尝试去抢占storage部分, free的部分可以直接抢占
                //  如果当前状态时storage抢占了executor的内存, 则立即释放它抢占的部分
                val memoryReclaimableFromStorage =
                  math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
                if (memoryReclaimableFromStorage > 0) {
                  // 申请满足需求的内存
                  val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
                    math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
                  storageMemoryPool.decrementPoolSize(spaceToReclaim)
                  onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
                }
              }
            }
    
            /**
               计算当前能用到的最大内存, 很容易理解
               如果storage没有满, 那么就最大内存是尽可能多的抢占
               如果storage已经满了, 按照默认配置, executor可以拿到50%的内存空间
             */
            def computeMaxExecutionPoolSize(): Long = {
              maxMemory - math.min(storageMemoryUsed, storageRegionSize)
            }
    
            onHeapExecutionMemoryPool.acquireMemory(
              numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
    
          case MemoryMode.OFF_HEAP =>
            //  off-heap是由tachyon管理的, 逻辑在独立的项目里
            offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
        }
      }
    

    申请存储用内存

      override def acquireStorageMemory(
          blockId: BlockId,
          numBytes: Long,
          evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
        assertInvariant()
        assert(numBytes >= 0)
        if (numBytes > maxStorageMemory) {
          // 这里会报一个错误, fast-fail机制
          logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
            s"memory limit ($maxStorageMemory bytes)")
          return false
        }
        if (numBytes > storageMemoryPool.memoryFree) {
          // 当内存不足时, 尝试抢占executor的空闲内存, 并把对应的blocck保存到evctedBlocks里去, 为Executor可能的索取做准备
          val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
          onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
          storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
        }
        storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
      
    

    相关文章

      网友评论

          本文标题:9.2 spark内存管理之 UnifiedMemoryMana

          本文链接:https://www.haomeiwen.com/subject/lcazlqtx.html