美文网首页Spark源码精读分析计划spark||flink||scalaSpark
Spark Core源码精读计划#26:内存存储MemorySt

Spark Core源码精读计划#26:内存存储MemorySt

作者: LittleMagic | 来源:发表于2019-07-23 22:47 被阅读26次

    目录

    前言

    差点把这个系列忘了,忙里偷闲接着写。

    前面我们已经对内存池MemoryPool、内存管理器MemoryManager有了比较深入的了解,接下来要介绍的就是MemoryStore,它负责Spark内存存储的具体事项,将内存管理机制与存储块联系起来。本文先介绍与MemoryStore相关的MemoryEntry,然后详细分析MemoryStore的主要源码。

    MemoryEntry

    顾名思义,MemoryEntry就是内存中的一个“项”,或者说是块在内存中的抽象表示。它由一个特征定义。

    代码#26.1 - o.a.s.memory.MemoryEntry特征

    private sealed trait MemoryEntry[T] {
      def size: Long
      def memoryMode: MemoryMode
      def classTag: ClassTag[T]
    }
    

    其中,size表示该MemoryEntry代表的块大小,memoryMode表示块存储在堆内内存还是堆外内存,classTag则是该块所存储的对象的类型标记。MemoryEntry有序列化和反序列化的两种实现,如下所示。

    代码#26.2 - o.a.s.memory.SerializedMemoryEntry/DeserializedMemoryEntry类

    private case class DeserializedMemoryEntry[T](
        value: Array[T],
        size: Long,
        classTag: ClassTag[T]) extends MemoryEntry[T] {
      val memoryMode: MemoryMode = MemoryMode.ON_HEAP
    }
    
    private case class SerializedMemoryEntry[T](
        buffer: ChunkedByteBuffer,
        memoryMode: MemoryMode,
        classTag: ClassTag[T]) extends MemoryEntry[T] {
      def size: Long = buffer.size
    }
    

    可见,反序列化的DeserializedMemoryEntry只能用堆内内存存储,其数据是T类型的对象的数组。序列化的SerializedMemoryEntry能用堆内和堆外内存存储,数据用之前讲过的字节缓存ChunkedByteBuffer包装,并且其长度就是该SerializedMemoryEntry的大小。

    MemoryStore

    MemoryStore的内容比较多,仍然分块来看。

    构造与属性成员

    代码#26.3 - o.a.s.memory.MemoryStore类的构造与属性成员

    private[spark] class MemoryStore(
        conf: SparkConf,
        blockInfoManager: BlockInfoManager,
        serializerManager: SerializerManager,
        memoryManager: MemoryManager,
        blockEvictionHandler: BlockEvictionHandler)
      extends Logging {
    
      private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
    
      private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
      private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
    
      private val unrollMemoryThreshold: Long =
        conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
    
      private def maxMemory: Long = {
        memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
      }
    
      if (maxMemory < unrollMemoryThreshold) {
        logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
          s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +
          s"memory. Please configure Spark with more memory.")
      }
    
      logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
    
      private def memoryUsed: Long = memoryManager.storageMemoryUsed
    
      def currentUnrollMemory: Long = memoryManager.synchronized {
        onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
      }
    
      private def blocksMemoryUsed: Long = memoryManager.synchronized {
        memoryUsed - currentUnrollMemory
      }
    

    可见,MemoryStore需要5个构造方法参数。前4个参数我们已经很熟悉了,不再多说。第5个参数是BlockEvictionHandler类型的,它实际上也是个特征,实现了该特征的类的作用就是将块从内存中淘汰掉。目前只有BlockManager实现了该特征,所以等到讲BlockManager时,再回头看它。

    以下是MemoryStore类的属性成员:

    • entries:块ID与对应的MemoryEntry的映射关系,用LinkedHashMap结构存储,初始容量为32,负载因子0.75。
    • onHeapUnrollMemoryMap/offHeapUnrollMemoryMap:分别存储TaskAttempId与该Task在堆内、堆外内存占用的展开内存大小映射关系。
    • unrollMemoryThreshold:在展开块之前申请的初始展开内存大小,由spark.storage.unrollMemoryThreshold配置项来控制,默认1MB。

    除此之外,还有四个Getter方法,它们负责返回对应内存的量:

    • maxMemory:堆内与堆外存储内存之和。如果内存管理器为StaticMemoryManager,该值为定值;如果内存管理器为UnifiedMemoryManager,该值会浮动。
    • memoryUsed:已经使用了的堆内与堆外存储内存之和。
    • currentUnrollMemory:当前展开内存占用的大小,由上面的onHeapUnrollMemoryMap/offHeapUnrollMemoryMap统计而来。
    • blocksMemoryUsed:当前除展开内存之外的存储内存(即真正存储块的内存)大小,即memoryUsed与currentUnrollMemory之差。

    下面我们分别来看向MemoryStore写入以及从MemoryStore读取数据的方法。

    直接写入字节

    该方法名为putBytes(),代码如下。

    代码#26.4 - o.a.s.memory.MemoryStore.putBytes()方法

      def putBytes[T: ClassTag](
          blockId: BlockId,
          size: Long,
          memoryMode: MemoryMode,
          _bytes: () => ChunkedByteBuffer): Boolean = {
        require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
        if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
          // We acquired enough memory for the block, so go ahead and put it
          val bytes = _bytes()
          assert(bytes.size == size)
          val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
          entries.synchronized {
            entries.put(blockId, entry)
          }
          logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
            blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
          true
        } else {
          false
        }
      }
    

    该方法的实现比较简单:首先调用MemoryManager.acquireStorageMemory()方法申请所需的内存,然后调用参数中传入的偏函数_bytes,获取已经转化为ChunkedByteBuffer的数据。再创建出对应的SerializedMemoryEntry,并将该MemoryEntry放入entries映射。注意LinkedHashMap本身不是线程安全的,因此对其并发访问都要加锁。

    写入迭代器化的数据

    所谓迭代器化的数据,就是指用Iterator[T]形式表示的块数据。之所以会这样表示,是因为有时单个块对应的数据可能过大,不能一次性存入内存。为了避免造成OOM,就可以一边遍历迭代器,一边周期性地写内存,并检查内存是否够用,就像翻书一样。“展开”(Unroll)这个词形象地说明了该过程,其对应的方法是putIteratorAsValues()与putIteratorAsBytes(),分别产生DeserializedMemoryEntry与SerializedMemoryEntry。由于两个方法的逻辑类似,因此我们只以putIteratorAsValues()来讲解。

    代码很长,但我还是不“Unroll”了,全部放在下面。

    代码#26.5 - o.a.s.memory.MemoryStore.putIteratorAsValues()

      private[storage] def putIteratorAsValues[T](
          blockId: BlockId,
          values: Iterator[T],
          classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
        require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    
        var elementsUnrolled = 0
        var keepUnrolling = true
        val initialMemoryThreshold = unrollMemoryThreshold
        val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
        var memoryThreshold = initialMemoryThreshold
        val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
        var unrollMemoryUsedByThisBlock = 0L
        var vector = new SizeTrackingVector[T]()(classTag)
    
        keepUnrolling =
          reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
    
        if (!keepUnrolling) {
          logWarning(s"Failed to reserve initial memory threshold of " +
            s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
        } else {
          unrollMemoryUsedByThisBlock += initialMemoryThreshold
        }
    
        while (values.hasNext && keepUnrolling) {
          vector += values.next()
          if (elementsUnrolled % memoryCheckPeriod == 0) {
            val currentSize = vector.estimateSize()
            if (currentSize >= memoryThreshold) {
              val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
              keepUnrolling =
                reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
              if (keepUnrolling) {
                unrollMemoryUsedByThisBlock += amountToRequest
              }
              memoryThreshold += amountToRequest
            }
          }
          elementsUnrolled += 1
        }
    
        if (keepUnrolling) {
          val arrayValues = vector.toArray
          vector = null
          val entry =
            new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
          val size = entry.size
    
          def transferUnrollToStorage(amount: Long): Unit = {
            memoryManager.synchronized {
              releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
              val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
              assert(success, "transferring unroll memory to storage memory failed")
            }
          }
    
          val enoughStorageMemory = {
            if (unrollMemoryUsedByThisBlock <= size) {
              val acquiredExtra =
                memoryManager.acquireStorageMemory(
                  blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
              if (acquiredExtra) {
                transferUnrollToStorage(unrollMemoryUsedByThisBlock)
              }
              acquiredExtra
            } else { 
              val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
              releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
              transferUnrollToStorage(size)
              true
            }
          }
    
          if (enoughStorageMemory) {
            entries.synchronized {
              entries.put(blockId, entry)
            }
            logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
              blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
            Right(size)
          } else {
            assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
              "released too much unroll memory")
            Left(new PartiallyUnrolledIterator(
              this,
              MemoryMode.ON_HEAP,
              unrollMemoryUsedByThisBlock,
              unrolled = arrayValues.toIterator,
              rest = Iterator.empty))
          }
        } else {
          logUnrollFailureMessage(blockId, vector.estimateSize())
          Left(new PartiallyUnrolledIterator(
            this,
            MemoryMode.ON_HEAP,
            unrollMemoryUsedByThisBlock,
            unrolled = vector.iterator,
            rest = values))
        }
      }
    

    在具体看逻辑之前,先弄明白两个配置项:

    • UNROLL_MEMORY_CHECK_PERIOD,对应参数spark.storage.unrollMemoryCheckPeriod,表示在迭代过程中检查内存是否够用的周期,默认值16,即每16个元素检查一次。
    • UNROLL_MEMORY_GROWTH_FACTOR,对应参数spark.storage.unrollMemoryGrowthFactor,表示申请新的展开内存时扩展的倍数,默认值1.5。

    然后就可以具体探究该方法的执行流程了:

    1. 调用reserveUnrollMemoryForThisTask(),申请初始的展开内存,并随时记录该块使用了多少展开内存。
    2. 循环迭代块的数据,将其放入一个SizeTrackingVector中。该数据结构可以动态估算其中存储的元素的大小,后面会详细分析。
    3. 每当到了检查的时机,如果已经展开的数据大小超过了当前的展开内存阈值,就再次调用reserveUnrollMemoryForThisTask()方法,试图申请新的展开内存(注意上面的扩展倍数的用法)。申请到之后,同时更新阈值。
    4. 所有数据都展开之后,标志keepUnrolling为真,表示展开成功。将SizeTrackingVector中的数据封装为DeserializedMemoryEntry。
    5. 检查申请到的展开内存是否比实际大小还大。如果是,就调用嵌套定义的transferUnrollToStorage()方法(实际又调用了releaseUnrollMemoryForThisTask()方法),释放掉多余的展开内存,并将它们返还给存储内存。
    6. 一切成功,将块ID与DeserializedMemoryEntry的映射放入entries,并返回Right。注意这个方法返回值的类型是Either类型,它在Scala中表示不相交的两个结果的集合,即可能返回错误的结果(Left),或者正确的结果(Right)。
    7. 如果没有足够的展开内存,或者展开所有数据后keepUnrolling标志为假,都表示这次写入不成功,返回Left,其中又包含PartiallyUnrolledIterator,表示一个没有完全展开的迭代器。

    对于这种又臭又长的单个方法,多读几遍自然就能通顺。下面贴出它调用的申请与释放展开内存的方法,与上面的一大坨相比已经是毛毛雨了,不再赘述了。

    代码#26.6 - o.a.s.memory.MemoryStore.reserveUnrollMemoryForThisTask()/releaseUnrollMemoryForThisTask()方法

      def reserveUnrollMemoryForThisTask(
          blockId: BlockId,
          memory: Long,
          memoryMode: MemoryMode): Boolean = {
        memoryManager.synchronized {
          val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
          if (success) {
            val taskAttemptId = currentTaskAttemptId()
            val unrollMemoryMap = memoryMode match {
              case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
              case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
            }
            unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
          }
          success
        }
      }
    
      def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
        val taskAttemptId = currentTaskAttemptId()
        memoryManager.synchronized {
          val unrollMemoryMap = memoryMode match {
            case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
            case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
          }
          if (unrollMemoryMap.contains(taskAttemptId)) {
            val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
            if (memoryToRelease > 0) {
              unrollMemoryMap(taskAttemptId) -= memoryToRelease
              memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
            }
            if (unrollMemoryMap(taskAttemptId) == 0) {
              unrollMemoryMap.remove(taskAttemptId)
            }
          }
        }
      }
    

    读取字节与迭代器化的数据

    前者对应的是SerializedMemoryEntry,由getBytes()方法实现。后者对应的是DeserializedMemoryEntry,由getValues()方法实现。

    代码#26.7 - o.a.s.memory.MemoryStore.getBytes()/getValues()方法

      def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
        val entry = entries.synchronized { entries.get(blockId) }
        entry match {
          case null => None
          case e: DeserializedMemoryEntry[_] =>
            throw new IllegalArgumentException("should only call getBytes on serialized blocks")
          case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
        }
      }
    
      def getValues(blockId: BlockId): Option[Iterator[_]] = {
        val entry = entries.synchronized { entries.get(blockId) }
        entry match {
          case null => None
          case e: SerializedMemoryEntry[_] =>
            throw new IllegalArgumentException("should only call getValues on deserialized blocks")
          case DeserializedMemoryEntry(values, _, _) =>
            val x = Some(values)
            x.map(_.iterator)
        }
      }
    

    淘汰缓存块

    该方法名为evictBlocksToFreeSpace(),用途为淘汰现有的一些块,以为新的块腾出空间。它在StorageMemoryPool.acquireMemory()方法(代码#23.4)中调用,如果忘记了的话,可以返回去看看。这个方法的代码也比较长,但稍微容易理解一些。

    代码#26.8 - o.a.s.memory.MemoryStore.evictBlocksToFreeSpace()方法

      private[spark] def evictBlocksToFreeSpace(
          blockId: Option[BlockId],
          space: Long,
          memoryMode: MemoryMode): Long = {
        assert(space > 0)
        memoryManager.synchronized {
          var freedMemory = 0L
          val rddToAdd = blockId.flatMap(getRddId)
          val selectedBlocks = new ArrayBuffer[BlockId]
    
          def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
            entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
          }
    
          entries.synchronized {
            val iterator = entries.entrySet().iterator()
            while (freedMemory < space && iterator.hasNext) {
              val pair = iterator.next()
              val blockId = pair.getKey
              val entry = pair.getValue
              if (blockIsEvictable(blockId, entry)) {
                if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
                  selectedBlocks += blockId
                  freedMemory += pair.getValue.size
                }
              }
            }
          }
    
          def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
            val data = entry match {
              case DeserializedMemoryEntry(values, _, _) => Left(values)
              case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
            }
            val newEffectiveStorageLevel =
              blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
            if (newEffectiveStorageLevel.isValid) {
              blockInfoManager.unlock(blockId)
            } else {
              blockInfoManager.removeBlock(blockId)
            }
          }
    
          if (freedMemory >= space) {
            var lastSuccessfulBlock = -1
            try {
              logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
                s"(${Utils.bytesToString(freedMemory)} bytes)")
              (0 until selectedBlocks.size).foreach { idx =>
                val blockId = selectedBlocks(idx)
                val entry = entries.synchronized {
                  entries.get(blockId)
                }
    
                if (entry != null) {
                  dropBlock(blockId, entry)
                  afterDropAction(blockId)
                }
                lastSuccessfulBlock = idx
              }
              logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
                s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
              freedMemory
            } finally {
              if (lastSuccessfulBlock != selectedBlocks.size - 1) {
                (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
                  val blockId = selectedBlocks(idx)
                  blockInfoManager.unlock(blockId)
                }
              }
            }
          } else {
            blockId.foreach { id =>
              logInfo(s"Will not store $id")
            }
            selectedBlocks.foreach { id =>
              blockInfoManager.unlock(id)
            }
            0L
          }
        }
      }
    

    方法参数中的space就表示需要腾出多大的空间。其执行流程如下:

    1. 循环遍历entries映射中的块,找出其中能够被淘汰的块。所谓能够被淘汰,是指MemoryMode相同(即堆内对堆内,堆外对堆外,不能交叉),并且块ID对应的块数据不属于RDD。
    2. 为这些块加写锁,保证当前正在被读取的块不会被淘汰掉。记录将要被淘汰的块ID。
    3. 如果腾出的空间已经达到了目标值,就调用嵌套定义的dropBlock()方法真正地移除这些块,最终仍然调用了BlockManager.dropFromMemory()方法。该方法会产生两种结果:一是块仍然存在,只是StorageLevel发生变化(比如转存到了磁盘),就只需解开它的写锁;二是块被彻底地移除,就得调用BlockInfoManager.remove()方法删掉它。最后将剩余未处理的块解锁。
    4. 如果腾出的空间最终仍然不能达到目标值,就不会执行淘汰动作,新的块也不会被存入。

    总结

    本文首先简要介绍了MemoryEntry的作用,然后详细阅读了MemoryStore的源码,了解了序列化数据和反序列化数据在Spark内存中的读写流程。信息量确实很大,也比较枯燥,但到此为止,我们总算对内存在Spark存储体系中的作用有了较为全面的认识。下一篇文章就会进入磁盘存储的领域。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#26:内存存储MemorySt

        本文链接:https://www.haomeiwen.com/subject/ucszlctx.html