美文网首页
spark源码阅读之storage模块②

spark源码阅读之storage模块②

作者: invincine | 来源:发表于2019-04-12 16:41 被阅读0次

    spark源码阅读之storage模块①中,描绘了Storage模块的整体框架是标准的master-slave框架:master用来管理slave的元数据信息,slave则是具体存储数据,分析了作为master节点的BlockManagerMasterEndpoint和作为slave节点的BlockManagerSlaveEndpoint之间如何传递消息。
    这篇文章中将分析数据Block存储的具体过程,分析它是如何实现的
    本篇文章源码基于spark 1.6.3

    存储级别

    缓存RDD有两个方法,cache()和persist(),而cache方法底层调用的还是persist方法,只不过cache方法传入了默认的参数,算是persist的一个快捷操作。

    persist的构造方法中可以传入存储级别,如下所示:

    def persist(newLevel: StorageLevel): this.type = {
      if (isLocallyCheckpointed) {
        // This means the user previously called localCheckpoint(), which should have already
        // marked this RDD for persisting. Here we should override the old storage level with
        // one that is explicitly requested by the user (after adapting it to use disk).
        persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
      } else {
        persist(newLevel, allowOverride = false)
      }
    }
    

    这个StorageLevel参数就是spark中的存储级别,代表storage的数据以什么方式存入什么媒介中

    StorageLevel的构造方法如下:

    class StorageLevel private(
        private var _useDisk: Boolean,
        private var _useMemory: Boolean,
        private var _useOffHeap: Boolean,
        private var _deserialized: Boolean,
        private var _replication: Int = 1)
      extends Externalizable {...}
    

    五个参数分别代表:

    _useDisk:是否使用磁盘
    _useMemory:是否使用内存
    _useOffHeap:是否使用堆外存储
    _deserialized:是否序列化
    _replication:副本个数

    除了_useOffHeap外,其他参数可以随意配合使用,使用方法如下:

    object StorageLevel {
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(false, false, true, false)
      /**
       * :: DeveloperApi ::
       * Return the StorageLevel object with the specified name.
       */
      @DeveloperApi
      def fromString(s: String): StorageLevel = s match {
        case "NONE" => NONE   //不保存任何数据
        case "DISK_ONLY" => DISK_ONLY   //仅保存在磁盘
        case "DISK_ONLY_2" => DISK_ONLY_2   //仅保存在磁盘,备份一份
        case "MEMORY_ONLY" => MEMORY_ONLY   //仅保存在内存
        case "MEMORY_ONLY_2" => MEMORY_ONLY_2   //仅保存在内存,备份一份
        case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER   //仅保存在内存,保存序列化后的对象
        case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2   //仅保存在内存,保存序列化后的对象,备份一份
        case "MEMORY_AND_DISK" => MEMORY_AND_DISK   //优先保存在内存,溢出部分保存在磁盘
        case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2   //同上,备份一份
        case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER   //优先保存在内存,溢出部分保存在磁盘,保存序列化后的结果
        case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2   //同上,备份一份
        case "OFF_HEAP" => OFF_HEAP   //保存在堆外存储
        case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
      }
    

    即在persist方法中传入对应的字符串即可指定存储级别

    存储级别的选择:

    一般来说

    1. 优先使用内存MEMORY_ONLY,如果内存不够可以加上序列化MEMORY_ONLY_SER,当然也需要衡量序列化带来的cpu消耗
    2. 尽量不要使用磁盘,因为磁盘IO消耗的时间远大于内存,迫不得已重算partition数据可能都要更优,除非计算逻辑复杂,且内存放不下数据集,或者你安装的是SSD盘,可以考虑采用MEMORY_AND_DISK
    3. 副本机制的作用相较于容错其实更偏向于效率,因为在spark中丢失的数据可以重算,且数据源一般都有副本机制(如HDFS),那么增加一个副本的理由可能就是避免重算,提高效率

    存储细节

    RDD的persist只有在触发一个action操作(比如count)的时候才会真正实施,然后通过一系列操作,最后会在Task中调用RDD的iterator()方法来执行计算,以下是iterator方法的代码:

    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
      if (storageLevel != StorageLevel.NONE) {    // 如果存储级别不是NONE,就从cacheManager中获取数据
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
      } else {    // 否则就读取checkpoint,再否则就重新计算
        computeOrReadCheckpoint(split, context)
      }
    }
    

    如果存储级别不为NONE,那么会调用CacheManager的getOrCompute方法,如果有缓存则读取,如果没有则计算并按照存储级别将数据写入缓存,CacheManager相当于BlockManager的包装类,用来管理缓存内容,继续看CacheManager的getOrCompute方法:

    def getOrCompute[T](
        rdd: RDD[T],
        partition: Partition,
        context: TaskContext,
        storageLevel: StorageLevel): Iterator[T] = {
      val key = RDDBlockId(rdd.id, partition.index) //获取blockid
      logDebug(s"Looking for partition $key")
      blockManager.get(key) match {   //向BlockManager查询是否有缓存
        case Some(blockResult) => //缓存命中
          // Partition is already materialized, so just return its values
          val existingMetrics = context.taskMetrics
            .getInputMetricsForReadMethod(blockResult.readMethod)   //更新统计信息
          existingMetrics.incBytesRead(blockResult.bytes)
          //将缓存作为结果返回
          val iter = blockResult.data.asInstanceOf[Iterator[T]]
          new InterruptibleIterator[T](context, iter) {
            override def next(): T = {
              existingMetrics.incRecordsRead(1)
              delegate.next()
            }
          }
        case None =>    //没有命中缓存,需要计算
          // Acquire a lock for loading this partition
          // If another thread already holds the lock, wait for it to finish return its results
          val storedValues = acquireLockForPartition[T](key)    //申请一个锁来加载这个分区的数据
          if (storedValues.isDefined) {   //如果这部分数据已经被计算过直接返回结果
            return new InterruptibleIterator[T](context, storedValues.get)
          }
          // Otherwise, we have to load the partition ourselves
          //如果没有被计算过,我们需要重新计算这部分数据
          try {
            logInfo(s"Partition $key not found, computing it")
            //如果被checkpoint过则读取checkpoint的数据,否则就计算
            val computedValues = rdd.computeOrReadCheckpoint(partition, context)
            // If the task is running locally, do not persist the result
            //如果这个task是在driver端执行的话就直接返回结果
            if (context.isRunningLocally) {
              return computedValues
            }
            // Otherwise, cache the values and keep track of any updates in block statuses
            //如果是在executor端执行的话就需要更新缓存信息
            val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]   //将计算结果写入BlockManager中
            val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
            val metrics = context.taskMetrics   //更新任务的统计信息
            val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
            metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
            new InterruptibleIterator(context, cachedValues)
          } finally {
            loading.synchronized {
              //如果有其他的线程在等待改分区的处理结果,那么通知它们已经计算完成
              //结果已经储存到BlockManager中
              loading.remove(key)
              loading.notifyAll()
            }
          }
      }
    }
    

    以上代码中,数据在计算之前会反复确认是否存在缓存中,最后也会调用RDD的computeOrReadCheckpoint方法来计算这部分数据:
    val computedValues = rdd.computeOrReadCheckpoint(partition, context)
    computeOrReadCheckpoint方法中会递归的调用当前RDD的parentRDD的iterator方法,最后会调用不同RDD类别的compute方法来计算数据:

    private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
    {
      if (isCheckpointedAndMaterialized) {
        firstParent[T].iterator(split, context)
      } else {
        compute(split, context)
      }
    }
    

    拿到计算数据后会调用putInBlockManager方法将计算结果写入到BlockManager中

    private def putInBlockManager[T](
        key: BlockId,
        values: Iterator[T],
        level: StorageLevel,
        updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
        effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
      val putLevel = effectiveStorageLevel.getOrElse(level)   //获取存储级别
      if (!putLevel.useMemory) {  //如果没有使用内存的存储级别,可以直接写入BlockManager
        /*
         * This RDD is not to be cached in memory, so we can just pass the computed values as an
         * iterator directly to the BlockManager rather than first fully unrolling it in memory.
         */
        updatedBlocks ++=
          blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
        blockManager.get(key) match {
          case Some(v) => v.data.asInstanceOf[Iterator[T]]
          case None =>
            logInfo(s"Failure to store $key")
            throw new BlockException(key, s"Block manager failed to return cached value for $key!")
        }
      } else {  //否则就在内存中展开数据
        /*
         * This RDD is to be cached in memory. In this case we cannot pass the computed values
         * to the BlockManager as an iterator and expect to read it back later. This is because
         * we may end up dropping a partition from memory store before getting it back.
         *
         * In addition, we must be careful to not unroll the entire partition in memory at once.
         * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
         * single partition. Instead, we unroll the values cautiously, potentially aborting and
         * dropping the partition to disk if applicable.
         */
        blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
          case Left(arr) =>
            // We have successfully unrolled the entire partition, so cache it in memory
            updatedBlocks ++=
              blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
            arr.iterator.asInstanceOf[Iterator[T]]
          case Right(it) =>
            // There is not enough space to cache this partition in memory
            val returnValues = it.asInstanceOf[Iterator[T]]
            if (putLevel.useDisk) {
              logWarning(s"Persisting partition $key to disk instead.")
              val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
                useOffHeap = false, deserialized = false, putLevel.replication)
              putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
            } else {
              returnValues
            }
        }
      }
    }
    

    代码中,如果获取的存储级别没有memory,那么就调用BlockManager的putIterator方法将计算结果直接写入磁盘,否则就调用BlockManager的putArray方法将计算结果在内存中展开。

    def putIterator(
        blockId: BlockId,
        values: Iterator[Any],
        level: StorageLevel,
        tellMaster: Boolean = true,
        effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
      require(values != null, "Values is null")
      doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
    }
    
    def putArray(
        blockId: BlockId,
        values: Array[Any],
        level: StorageLevel,
        tellMaster: Boolean = true,
        effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
      require(values != null, "Values is null")
      doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
    }
    
    def putBytes(
        blockId: BlockId,
        bytes: ByteBuffer,
        level: StorageLevel,
        tellMaster: Boolean = true,
        effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
      require(bytes != null, "Bytes is null")
      doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
    }
    

    两者最终都会调用doPut方法,只不过一个数据封装为IteratorValues另一个为ArrayValues,一个对应磁盘一个对应内存,还有一个方法也会调用doPut方法,就是BlockManager的putBytes方法,对应的是外部存储。

    doPut方法篇幅略长,分为以下三个部分来说明,

    1. 其中分类缓存数据的部分代码如下所示:
    try {
      // returnValues - Whether to return the values put
      // blockStore - The type of storage to put these values into
      val (returnValues, blockStore: BlockStore) = {
        if (putLevel.useMemory) {
          // Put it in memory first, even if it also has useDisk set to true;
          // We will drop it to disk later if the memory store can't hold it.
          (true, memoryStore)
        } else if (putLevel.useOffHeap) {
          // Use external block store
          (false, externalBlockStore)
        } else if (putLevel.useDisk) {
          // Don't get back the bytes from put unless we replicate them
          (putLevel.replication > 1, diskStore)
        } else {
          assert(putLevel == StorageLevel.NONE)
          throw new BlockException(
            blockId, s"Attempted to put block $blockId without specifying storage level!")
        }
      }
      // Actually put the values
      val result: PutResult = data match {
        case IteratorValues(iterator) =>
          blockStore.putIterator(blockId, iterator, putLevel, returnValues)
        case ArrayValues(array) =>
          blockStore.putArray(blockId, array, putLevel, returnValues)
        case ByteBufferValues(bytes) =>
          bytes.rewind()
          blockStore.putBytes(blockId, bytes, putLevel)
      }
    

    这里存入的数据结构为memoryStore、diskStore、externalBlockStore分别对应着存储级别中的内存、磁盘和外部存储,他们缓存数据的逻辑后面单独说明,这里分别调用了它们的putArray、putIterator、putBytes方法。
    其中需要注意的是,如果存储级别是MEMORY_AND_DISK,代码中体现了优先存储在内存memoryStore中,等到内存满了才会写到diskStore中。

    2.其次是副本逻辑,代码体现如下:

    // If we're storing bytes, then initiate the replication before storing them locally.
    // This is faster as data is already serialized and ready to send.
    val replicationFuture = data match {
      case b: ByteBufferValues if putLevel.replication > 1 =>
        // Duplicate doesn't copy the bytes, but just creates a wrapper
        val bufferView = b.buffer.duplicate()
        Future {
          // This is a blocking action and should run in futureExecutionContext which is a cached
          // thread pool
          replicate(blockId, bufferView, putLevel)
        }(futureExecutionContext)
      case _ => null
    }
    

    这里启动一个Future的线程优先去处理ByteBufferValues类的数据(也就是外部存储类的数据)的复制,其中的核心方法是replicate方法,感兴趣的话可以深入了解一下。

    那么IteratorValues类型的数据和ArrayValues类型的副本逻辑怎么处理呢?请看以下代码:

    if (putLevel.replication > 1) {
      data match {
        case ByteBufferValues(bytes) =>
          if (replicationFuture != null) {
            Await.ready(replicationFuture, Duration.Inf)
          }
        case _ =>
          val remoteStartTime = System.currentTimeMillis
          // Serialize the block if not already done
          if (bytesAfterPut == null) {
            if (valuesAfterPut == null) {
              throw new SparkException(
                "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
            }
            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
          }
          replicate(blockId, bytesAfterPut, putLevel)
          logDebug("Put block %s remotely took %s"
            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
      }
    }
    

    如果是ByteBufferValues类型的数据,那么会对应上面的代码,去等待那个Future线程的回传值。
    如果是另外两种类型,则首先进行序列化,然后调用replicate方法去进行复制操作。

    1. tellMaster将更新上报Master
    if (tellMaster) {
      reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
    }
    

    调用reportBlockStatus方法向master汇报更新,最后会向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,而Master会在收到消息后更新Block的元数据


    存储Block的类
    BlockStore

    Block存储的抽象类,定义了接口的一些基本功能和方法:

    /**
     * Abstract class to store blocks.
     */
    private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
      // 根据StorageLevel将blockId标识的Block的内容bytes写入系统
      def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult
      /**
       * Put in a block and, possibly, also return its content as either bytes or another Iterator.
       * This is used to efficiently write the values to multiple locations (e.g. for replication).
       *
       * @return a PutResult that contains the size of the data, as well as the values put if
       *         returnValues is true (if not, the result's data field can be null)
       */
      // 将values写入系统,如果returnValues为true,需要将结果写入PutResult
      def putIterator(
        blockId: BlockId,
        values: Iterator[Any],
        level: StorageLevel,
        returnValues: Boolean): PutResult
      //同上,只不过由Iterator变成Array
      def putArray(
        blockId: BlockId,
        values: Array[Any],
        level: StorageLevel,
        returnValues: Boolean): PutResult
      /**
       * Return the size of a block in bytes.
       */
      // 获得Block的大小
      def getSize(blockId: BlockId): Long
      // 获得Block的数据,返回类型ByteBuffer
      def getBytes(blockId: BlockId): Option[ByteBuffer]
      // 获取Block的数据,返回类型Iterator[Any]
      def getValues(blockId: BlockId): Option[Iterator[Any]]
      /**
       * Remove a block, if it exists.
       * @param blockId the block to remove.
       * @return True if the block was found and removed, False otherwise.
       */
      // 删除Block,成功返回true, 否则返回false
      def remove(blockId: BlockId): Boolean
      // 查询是否包含某个Block
      def contains(blockId: BlockId): Boolean
      // 退出时清理回收资源
      def clear() { }
    }
    

    BlockStore的实现类有三个MemoryStore、DiskStore、ExternalBlockStore,分别对应了存储级别的内存、磁盘和外部存储。

    MemoryStore维护了一个数据结构,是一个HashMap
    private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
    所有需要缓存在内存中的数据都是通过tryToPut方法维护到这个数据结构中,如果内存不够的话会释放一些老的缓存,如果存储级别中还有磁盘,就会调用DiskStore的putIterator写入Disk,如果没有,那么就不缓存这部分数据,下次需要就重新计算。

    DiskStore将数据持久化到磁盘中,会以什么样的形式存储呢?我们来看DiskStore的putIterator方法:

    override def putIterator(
        blockId: BlockId,
        values: Iterator[Any],
        level: StorageLevel,
        returnValues: Boolean): PutResult = {
      logDebug(s"Attempting to write values for block $blockId")
      val startTime = System.currentTimeMillis
      val file = diskManager.getFile(blockId)   //获取文件句柄
      val outputStream = new FileOutputStream(file)   //创建流
      try {
        Utils.tryWithSafeFinally {
          blockManager.dataSerializeStream(blockId, outputStream, values) //序列化流
        } {
          // Close outputStream here because it should be closed before file is deleted.
          outputStream.close()
        }
      } catch {
        case e: Throwable =>
          if (file.exists()) {
            if (!file.delete()) {
              logWarning(s"Error deleting ${file}")
            }
          }
          throw e
      }
      val length = file.length
      val timeTaken = System.currentTimeMillis - startTime
      logDebug("Block %s stored as %s file on disk in %d ms".format(
        file.getName, Utils.bytesToString(length), timeTaken))
      if (returnValues) {
        // Return a byte buffer for the contents of the file
        val buffer = getBytes(blockId).get
        PutResult(length, Right(buffer))
      } else {
        PutResult(length, null)
      }
    

    以上代码中创建了一个文件流,序列化之后就写入了本地的物理文件,获取文件句柄的方法为getFile,接着看其实现:

    def getFile(filename: String): File = {
      // Figure out which local directory it hashes to, and which subdirectory in that
      val hash = Utils.nonNegativeHash(filename)  //根据文件名hash值获取文件应该存放的层级位置
      val dirId = hash % localDirs.length
      val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
      // Create the subdirectory if it doesn't already exist
      val subDir = subDirs(dirId).synchronized {
        val old = subDirs(dirId)(subDirId)
        if (old != null) {
          old
        } else {
          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
          if (!newDir.exists() && !newDir.mkdir()) {
            throw new IOException(s"Failed to create local dir in $newDir.")
          }
          subDirs(dirId)(subDirId) = newDir
          newDir
        }
      }
      new File(subDir, filename)  //创建文件句柄
    }
    

    根据BlockId的name的hash值取得文件的存放路径,然后创建一个文件句柄将数据写入物理文件,而这个物理文件的路径可以通过spark.local.dir来进行配置,在yarn-cluster模式下,这个路径会被yarn.nodemanager.local-dirs替换。


    Storage模块调优
    1. 首先就是本篇文章开始时说的选择存储级别的注意事项,尽量使用内存,少用磁盘,序列化和副本根据情况选择使用。
    2. spark.local.dir:
      磁盘存储级别物理文件的路径设置项,尽量配置多个路径(用逗号隔开),如有条件最好选择SSD盘。
    3. spark.memory.storageFraction:
      可用内存中,用于Storage模块缓存数据的占比,默认为0.5,也就是和Shuffle模块占用内存五五开,但是1.5版本之后,spark有一个动态内存分配模型的功能,简单来说就是在使用内存的时候,Shuffle是亲儿子,它可以占用分给Storage的内存拒不归还,而Storage却不行。
      这里调优的策略就是,根据实际情况,如果程序RDD的缓存数据集量较大,而期间很少产生shuffle数据的话,可以适当把这个参数提高。
    4. 堆外内存(off-head memory)
      堆外内存也是一种外部存储,是spark通过调用java的unsafe相关API直接向操作系统要内存,这种方式的优点是跳过JVM的管理可以避免GC影响,缺点是需要自己来编写内存申请和释放的逻辑
      spark.memory.offHeap.enabled
      默认为false,设置为true打开堆外内存功能
      spark.memory.offHeap.size
      默认为0,,打开堆外内存功能后,方可设置内存大小,但在配置的时候需要小心内存溢出的问题

    总结

    关于Storage模块的源码阅读就分析到这儿,阅读Storage模块的源码有助于了解RDD之下,系统又做了哪些操作,RDD实现了逻辑,而Storage管理着数据。通过阅读源码,对于今后的问题定位和性能调优提供了理论依据。

    相关文章

      网友评论

          本文标题:spark源码阅读之storage模块②

          本文链接:https://www.haomeiwen.com/subject/uftxwqtx.html