美文网首页Spark源码精读分析计划
Spark Core源码精读计划#30:终于讲到的BlockMa

Spark Core源码精读计划#30:终于讲到的BlockMa

作者: LittleMagic | 来源:发表于2019-08-22 22:29 被阅读0次

    目录

    前言

    如题,在前方做了很多铺垫之后,本文终于可以来看BlockManager了,可谓是千呼万唤始出来。

    块管理器BlockManager会运行在Spark集群中的所有节点上。每个节点上的BlockManager通过MemoryManager、MemoryStore、DiskBlockManager、DiskStore来管理其内存、磁盘中的块,并与其他节点进行块的交互,是一个规模庞大的组件。为了避免写太多出不来,本文先聚焦在两个最基础的方面,即BlockManager的初始化与块的读取流程。写入流程和其他逻辑(比如BlockTransferService)会另开坑来讲解。

    BlockManager的初始化

    构造方法与属性成员

    代码#30.1 - o.a.s.storage.BlockManager类的构造方法与属性成员

    private[spark] class BlockManager(
        executorId: String,
        rpcEnv: RpcEnv,
        val master: BlockManagerMaster,
        val serializerManager: SerializerManager,
        val conf: SparkConf,
        memoryManager: MemoryManager,
        mapOutputTracker: MapOutputTracker,
        shuffleManager: ShuffleManager,
        val blockTransferService: BlockTransferService,
        securityManager: SecurityManager,
        numUsableCores: Int)
      extends BlockDataManager with BlockEvictionHandler with Logging {
    
      private[spark] val externalShuffleServiceEnabled =
        conf.getBoolean("spark.shuffle.service.enabled", false)
    
      val diskBlockManager = {
        val deleteFilesOnStop =
          !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
        new DiskBlockManager(conf, deleteFilesOnStop)
      }
    
      private[storage] val blockInfoManager = new BlockInfoManager
    
      private val futureExecutionContext = ExecutionContext.fromExecutorService(
        ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
    
      private[spark] val memoryStore =
        new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
      private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
      memoryManager.setMemoryStore(memoryStore)
    
      private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
      private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory
    
      private val externalShuffleServicePort = {
        val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
        if (tmpPort == 0) {
          conf.get("spark.shuffle.service.port").toInt
        } else {
          tmpPort
        }
      }
    
      var blockManagerId: BlockManagerId = _
      private[spark] var shuffleServerId: BlockManagerId = _
    
      private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
        val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
        new ExternalShuffleClient(transConf, securityManager,
          securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
      } else {
        blockTransferService
      }
    
      private val maxFailuresBeforeLocationRefresh =
        conf.getInt("spark.block.failures.beforeLocationRefresh", 5)
    
      private val slaveEndpoint = rpcEnv.setupEndpoint(
        "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
        new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
    
      private var asyncReregisterTask: Future[Unit] = null
      private val asyncReregisterLock = new Object
    
      @volatile private var cachedPeers: Seq[BlockManagerId] = _
      private val peerFetchLock = new Object
      private var lastPeerFetchTime = 0L
    
      private var blockReplicationPolicy: BlockReplicationPolicy = _
    
      private[storage] val remoteBlockTempFileManager =
        new BlockManager.RemoteBlockDownloadFileManager(this)
      private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
      // ......
    }
    

    BlockManager接受很多构造方法参数,之前已经讲过的类型就不再多说,其中还另外包含三个没有详细讲过的组件:MapOutputTracker,用于跟踪任务执行时Map任务的输出(即Reduce任务的输入),属于调度模块的一部分;ShuffleManager,用于管理Shuffle策略,在本专题之外的文章里详细分析过;BlockTransferService,顾名思义用来在各个节点之间远程传输块,这个在后面的文章中马上就会讲到。

    BlockManager实现了BlockDataManager和BlockEvictionHandler两个特征,分别表示BlockManager可以管理块数据,以及从内存中淘汰块。截止目前,BlockManager是这两个特征的唯一的实现类。

    下面来看看BlockManager类中的属性成员。看官已经很熟悉的组件(如MemoryStore、DiskStore等)也就不再赘述,只说几个主要的新面孔。

    • externalShuffleServiceEnabled:是否启用外部Shuffle服务,由配置项spark.shuffle.service.enabled来指定,默认不启用。什么叫外部Shuffle服务?我们都知道,传统的Shuffle服务是完全靠Executor来执行的,因此CPU和I/O都非常密集。如果Spark集群是on YARN的话,那么开启外部Shuffle就会在YARN NodeManager上跑一个常驻的YarnShuffleService,用来收取和分配Shuffle数据,降低Executor的压力。
    • futureExecutionContext:用于异步执行BlockManager中某些操作的守护线程池,大小为128。
    • blockManagerId:该BlockManager的ID,结构在上一篇文章中已经说过了。
    • shuffleServerId:用来保存Shuffle中间文件的实体ID。如果不用外部Shuffle服务的话,就与本BlockManagerId相同,否则就新建一个。
    • shuffleClient:用于获取其他Executor上的Shuffle文件的客户端。如果不启用外部Shuffle服务,就是前面提到过的BlockTransferService,否则就是ExternalShuffleClient实例。现在我们暂时不深究。
    • slaveEndpoint:BlockManager的从RPC端点的引用,使用RpcEnv.setupEndpoint()方法来生成。
    • blockReplicationPolicy:Spark中块复制的策略。

    初始化方法

    SparkEnv中调用了BlockManager的initialize()方法来初始化它,代码如下。

    代码#30.2 - o.a.s.storage.BlockManager.initialize()方法

      def initialize(appId: String): Unit = {
        blockTransferService.init(this)
        shuffleClient.init(appId)
    
        blockReplicationPolicy = {
          val priorityClass = conf.get(
            "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
          val clazz = Utils.classForName(priorityClass)
          val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
          logInfo(s"Using $priorityClass for block replication policy")
          ret
        }
    
        val id =
          BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
        val idFromMaster = master.registerBlockManager(
          id,
          maxOnHeapMemory,
          maxOffHeapMemory,
          slaveEndpoint)
    
        blockManagerId = if (idFromMaster != null) idFromMaster else id
        shuffleServerId = if (externalShuffleServiceEnabled) {
          logInfo(s"external shuffle service port = $externalShuffleServicePort")
          BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
        } else {
          blockManagerId
        }
    
        if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
          registerWithExternalShuffleServer()
        }
    
        logInfo(s"Initialized BlockManager: $blockManagerId")
      }
    

    BlockManager初始化的流程如下:

    1. 初始化BlockTransferService和ShuffleClient。
    2. 根据配置项spark.storage.replication.policy确定块复制策略并通过反射创建。默认值为RandomBlockReplicationPolicy,说明是将块的副本随机放到不同的节点上。
    3. 根据Executor ID生成BlockManagerId,并调用BlockManagerMaster.registerBlockManager()方法注册此ID与从RPC端点。注册成功后,BlockManagerMaster会返回另一个正式的ID。
    4. 生成Shuffle服务的ID。如果当前节点是Executor并启用了外部Shuffle服务的话,就调用registerWithExternalShuffleServer()方法注册外部Shuffle服务,代码略去。

    前面写了这么多,可能看官还是没有实感(其实窝自己也是)。那么接下来看块读取流程,这是BlockManager的主要任务之一,并且没那么虚。

    块读写的入口

    在BlockManager中提供了多种对块进行读写的方法,其中一个将读写进行统一的入口是getOrElseUpdate()方法。因为块可以由RDD物化而来,因此我们可以方便地在RDD类中(具体来说是RDD.getOrCompute()方法)找到对它的调用。为了方便分析,本文就由它来入手。先顺便看一下源码吧。

    代码#30.3 - o.a.s.storage.BlockManager.getOrElseUpdate()方法

      def getOrElseUpdate[T](
          blockId: BlockId,
          level: StorageLevel,
          classTag: ClassTag[T],
          makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
        get[T](blockId)(classTag) match {
          case Some(block) =>
            return Left(block)
          case _ =>
        }
        doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
          case None =>
            val blockResult = getLocalValues(blockId).getOrElse {
              releaseLock(blockId)
              throw new SparkException(s"get() failed for block $blockId even though we held a lock")
            }
            releaseLock(blockId)
            Left(blockResult)
          case Some(iter) =>
           Right(iter)
        }
      }
    

    该方法会首先根据块ID尝试读取数据(先从本地,后从远端)。如果获取不到,就调用传入的makeIterator函数将数据转化为迭代器并写入之。最终将读取或写入的数据封装在BlockResult结构中返回。

    块读取流程

    以下就是代码#30.3中调用的get()方法。

    代码#30.4 - o.a.s.storage.BlockManager.get()方法

      def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
        val local = getLocalValues(blockId)
        if (local.isDefined) {
          logInfo(s"Found block $blockId locally")
          return local
        }
        val remote = getRemoteValues[T](blockId)
        if (remote.isDefined) {
          logInfo(s"Found block $blockId remotely")
          return remote
        }
        None
      }
    

    该方法先调用getLocalValues()方法从本地(注意是本地Executor)读取数据,如果读取不到,就继续调用getRemoteValues()方法从远端获取数据。下面分别来看。

    从本地读取数据

    代码#30.5 - o.a.s.storage.BlockManager.getLocalValues()方法

      def getLocalValues(blockId: BlockId): Option[BlockResult] = {
        logDebug(s"Getting local block $blockId")
        blockInfoManager.lockForReading(blockId) match {
          case None =>
            logDebug(s"Block $blockId was not found")
            None
          case Some(info) =>
            val level = info.level
            logDebug(s"Level for block $blockId is $level")
            val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
            if (level.useMemory && memoryStore.contains(blockId)) {
              val iter: Iterator[Any] = if (level.deserialized) {
                memoryStore.getValues(blockId).get
              } else {
                serializerManager.dataDeserializeStream(
                  blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
              }
    
              val ci = CompletionIterator[Any, Iterator[Any]](iter, {
                releaseLock(blockId, taskAttemptId)
              })
              Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
            } else if (level.useDisk && diskStore.contains(blockId)) {
              val diskData = diskStore.getBytes(blockId)
              val iterToReturn: Iterator[Any] = {
                if (level.deserialized) {
                  val diskValues = serializerManager.dataDeserializeStream(
                    blockId,
                    diskData.toInputStream())(info.classTag)
                  maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
                } else {
                  val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
                    .map { _.toInputStream(dispose = false) }
                    .getOrElse { diskData.toInputStream() }
                  serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
                }
              }
              val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
                releaseLockAndDispose(blockId, diskData, taskAttemptId)
              })
              Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
            } else {
              handleLocalReadFailure(blockId)
            }
        }
      }
    

    有点长,但是思路比较清晰,以下简述这个方法的执行流程:

    1. 调用BlockInfoManager.lockForReading()方法,为这个块加读锁,并试图返回对应的块元数据BlockInfo。
    2. 如果没有BlockInfo,说明该块在本地不存在。反之,检查它的StorageLevel,按优先内存、其次磁盘的顺序考虑。
    3. 若该块的StorageLevel显示会利用内存,并且数据在MemoryStore中,就根据该数据是否序列化的情况,调用MemoryStore.getValues()或getBytes()方法,最终获得块数据的迭代器表示。
    4. 若该块的StorageLevel显示会利用磁盘,并且数据在DiskStore中,就先用DiskStore.getBytes()方法获得磁盘中块数据的字节流,然后根据是否序列化做不同的处理。其中还会用到maybeCacheDiskValuesInMemory()/maybeCacheDiskBytesInMemory()试图将读取到的磁盘数据cache到内存,以加快速度。
    5. 调用releaseLock()或releaseLockAndDispose()方法,释放块的读锁。
    6. 将块数据的迭代器、读取方法和块的字节数封装在BlockResult结构中返回。如果从内存读取和从磁盘读取都失败,就调用handleLocalReadFailure()方法处理本地读取的错误。

    希望说的还算明白哈。继续看从远端读取块数据的方法。

    从远端读取数据

    代码#30.6 - o.a.s.storage.BlockManager.getRemoteValues()方法

      private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
        val ct = implicitly[ClassTag[T]]
        getRemoteBytes(blockId).map { data =>
          val values =
            serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
          new BlockResult(values, DataReadMethod.Network, data.size)
        }
      }
    

    这个方法很短,是因为主要逻辑都在getRemoteBytes()方法中实现了。这是很显然的,因为远端的块数据必须要序列化之后才能传输,来到本地之后再反序列化为对象,所以实际上获取的是字节流。以下则是getRemoteBytes()方法的源码。

    代码#30.7 - o.a.s.storage.BlockManager.getRemoteBytes()方法

      def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
        logDebug(s"Getting remote block $blockId")
        require(blockId != null, "BlockId is null")
        var runningFailureCount = 0
        var totalFailureCount = 0
    
        val locationsAndStatus = master.getLocationsAndStatus(blockId)
        val blockSize = locationsAndStatus.map { b =>
          b.status.diskSize.max(b.status.memSize)
        }.getOrElse(0L)
        val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)
    
        val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
          remoteBlockTempFileManager
        } else {
          null
        }
    
        val locations = sortLocations(blockLocations)
        val maxFetchFailures = locations.size
        var locationIterator = locations.iterator
        while (locationIterator.hasNext) {
          val loc = locationIterator.next()
          logDebug(s"Getting remote block $blockId from $loc")
          val data = try {
            blockTransferService.fetchBlockSync(
              loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()
          } catch {
            case NonFatal(e) =>
              runningFailureCount += 1
              totalFailureCount += 1
    
              if (totalFailureCount >= maxFetchFailures) {
                logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
                  s"Most recent failure cause:", e)
                return None
              }
    
              logWarning(s"Failed to fetch remote block $blockId " +
                s"from $loc (failed attempt $runningFailureCount)", e)
    
              if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
                locationIterator = sortLocations(master.getLocations(blockId)).iterator
                logDebug(s"Refreshed locations from the driver " +
                  s"after ${runningFailureCount} fetch failures.")
                runningFailureCount = 0
              }
    
              null
          }
    
          if (data != null) {
            return Some(new ChunkedByteBuffer(data))
          }
          logDebug(s"The value of block $blockId is null")
        }
        logDebug(s"Block $blockId not found")
        None
      }
    

    该方法的执行流程如下:

    1. 调用BlockManagerMaster.getLocationsAndStatus()方法,获取所有持有该块数据的远端BlockManager位置。
    2. 调用sortLocations()方法,根据BlockManagerId中的拓扑信息对BlockManager的位置进行排序。处于同一台服务器上的BlockManager排在最前,然后是同一机架上的节点的BlockManager(前提是能够感知到机架),最后才是不同机架的节点上的BlockManager。
    3. 对于每个远端BlockManager,调用BlockTransferService.fetchBlockSync()方法,同步地获取块数据,并以ChunkedByteBuffer形式返回。
    4. 如果从某个远端BlockManager获取不到块数据,就继续尝试下一个。当失败的尝试次数达到spark.block.failures.beforeLocationRefresh参数规定的阈值(默认值5)时,就主动刷新一次远端BlockManager的位置,防止过期。
    5. 若已经尝试了所有的远端BlockManager仍然未获取到,就认为此次读取失败。

    总结

    本文详细叙述了BlockManager的初始化过程,以及从本地、远端读取块数据的过程。下两篇文章会将写入块与BlockTransferService的相关细节补齐,这样我们就可以整理出BlockManager读写流程的全貌了。

    晚安。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#30:终于讲到的BlockMa

        本文链接:https://www.haomeiwen.com/subject/tshasctx.html