美文网首页
Spark源码之BlockManager

Spark源码之BlockManager

作者: 小狼星I | 来源:发表于2018-10-21 12:01 被阅读0次

    Spark源码之BlockManager篇

    BlockManage作为对外提供的统一访问block的接口,而且它在spark中是以Master-Slave的模式存在,既然它在spark中承担着数据管理的大任,那么此篇就详细的探讨下BlockManager;

    BlockManager的生成,以及组织架构的形成

    BlockManager是在SparkEnv构建的时候生成的,我们打开SparkEnv的源码,可以看到先实例化出BlockManagerMaster以及BlockManagerMaster的消息通讯体BlockManagerMasterEndpoint,在实例化出BlockManagerMaster后开始创建BlockManager,因为BlockManager在spark中是以Master-Slave的形式存在的,那么它的slava又是在哪里分布的呢?

    //todo 创建blockTransferService
    val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
    //todo 创建BlockManagerMaster,并且实例化出BlockManagerMasterEndpoint
    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)
    // NB: blockManager is not valid until initialize() is called later.
    //todo 根据Executor创建出BlockManager
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)
    
    

    我们在打开我们的ExecutorBackEnd代码,可以看到在实例化CoarseGrainedExecutorBackend实例的时候传入了一个SparkEnv,其实在每个ExecutorBackend中都会分布一个BlockManager,并以slave的形式存在,这里调用SparkEnv.createExecutorEnv(......)方法,将ExecutorBackend的executorid传入进入,而在SparkEnv的createExecutorEnv方法中创建BlockManager时,是根据传入的executorId创建的,而在BlockManager中会实例化出BlockManagerSlaveEndpoint实例,和BlockManagerMaster能够建立通讯,这就形成了BlockManager的通讯结构;

     //todo 创建基于当前Executor的SparkEnv
     val env = SparkEnv.createExecutorEnv(
       driverConf, executorId, hostname, port, cores, isLocal = false)
     // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
     // connections (e.g., if it's using akka). Otherwise, the executor is running in
     // client mode only, and does not accept incoming connections.
     val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
         hostname + ":" + port
       }.orNull
     env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
       env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
    
    
    //todo 在BlockManager中会实例化出BlockManagerSlaveEndpoint实例
    private val slaveEndpoint = rpcEnv.setupEndpoint(
      "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
      new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
    

    BlockManager的注册

    我们再来看下BlockManager是如何向BlockManagerMaster注册的,在BlockManager实例化后调用initialize方法,在此方法内执行了BlockManagerMaster.registerBlockManager方法;

    //blockManager的initialize方法
    def initialize(appId: String): Unit = {
      blockTransferService.init(this)
      shuffleClient.init(appId)
      blockManagerId = BlockManagerId(
        executorId, blockTransferService.hostName, blockTransferService.port)
      shuffleServerId = if (externalShuffleServiceEnabled) {
        logInfo(s"external shuffle service port = $externalShuffleServicePort")
        BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
      } else {
        blockManagerId
      }
      master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
      // Register Executors' configuration with the local shuffle service, if one should exist.
      if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
        registerWithExternalShuffleServer()
      }
    

    进入BlockManagerMaster的registerBlockManager方法,可以看到它向master endpoint发送消息,其实这里的Driver就是BlockManagerMasterEndpoint

     /** Register the BlockManager's id with the driver. */
     def registerBlockManager(
         blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
       logInfo("Trying to register BlockManager")
       tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
       logInfo("Registered BlockManager")
     }  
     
     /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */
     private def tell(message: Any) {
       if (!driverEndpoint.askWithRetry[Boolean](message)) {
         throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
       }
     }
    

    我们再进入BlockManagerMasterEndpoint中,可以看到这里接收到BlockManager的注册,然后调用register方法;

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
        register(blockManagerId, maxMemSize, slaveEndpoint)
        context.reply(true)
    

    进入register方法中会看到有一个blockManagerInfo数据结构,这个数据结构存储所有slave注册的BlockManager信息;到这里ExecutorBackend中的BlockManager信息向Driver(BlockManagerMasterEndpoint)注册完毕;Driver端会维护所有ExecutorBackend上的BlockManager信息;

    private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointR
      val time = System.currentTimeMillis()
      if (!blockManagerInfo.contains(id)) {
        blockManagerIdByExecutor.get(id.executorId) match {
          case Some(oldId) =>
            // A block manager of the same executor already exists, so remove it (assumed 
            logError("Got two different block manager registrations on same executor - "
                + s" will replace old one $oldId with new one $id")
            removeExecutor(id.executorId)
          case None =>
        }
        logInfo("Registering block manager %s with %s RAM, %s".format(
          id.hostPort, Utils.bytesToString(maxMemSize), id))
        blockManagerIdByExecutor(id.executorId) = id
        blockManagerInfo(id) = new BlockManagerInfo(
          id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
      }
      listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
    }
    

    BlockManager的内部工作

    BlockManager之block信息上报:
    1.在BlockManager中的reportAllBlocks方法,遍历所有的的blockInfo准备上报给Master;
    2.接着进入tryToReportBlockStatus方法,在该方法中调用BlockManagerMaster的updateBlockInfo方法;
    3.在BlockManager的updateBlockInfo方法中可以看到向driverEndpoint发送UpdateBlockInfo消息;
    4.UpdateBlockInfo收到UpdateBlockInfo消息,然后再进一步调用自身内部的UpdateBlockInfo操作,具体方法这里不再叙述,主要是去更改维护driverEndpoint中的blockManagerInfo信息;
    参照如下代码:

    //1.blockManager中的reportAllBlocks方法
    private def reportAllBlocks(): Unit = {
      logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
      for ((blockId, info) <- blockInfo) {
        val status = getCurrentBlockStatus(blockId, info)
        if (!tryToReportBlockStatus(blockId, info, status)) {
          logError(s"Failed to report $blockId to master; giving up.")
          return
        }
      }
    }
    
    //2.blockManager中的tryToReportBlockStatus方法
    private def tryToReportBlockStatus(
        blockId: BlockId,
        info: BlockInfo,
        status: BlockStatus,
        droppedMemorySize: Long = 0L): Boolean = {
      if (info.tellMaster) {
        val storageLevel = status.storageLevel
        val inMemSize = Math.max(status.memSize, droppedMemorySize)
        val inExternalBlockStoreSize = status.externalBlockStoreSize
        val onDiskSize = status.diskSize
        master.updateBlockInfo(
          blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
      } else {
        true
      }
    }
    
    //3.BlockManagerMaster中的updateBlockInfo方法
    def updateBlockInfo(
        blockManagerId: BlockManagerId,
        blockId: BlockId,
        storageLevel: StorageLevel,
        memSize: Long,
        diskSize: Long,
        externalBlockStoreSize: Long): Boolean = {
      val res = driverEndpoint.askWithRetry[Boolean](
        UpdateBlockInfo(blockManagerId, blockId, storageLevel,
          memSize, diskSize, externalBlockStoreSize))
      logDebug(s"Updated info of block $blockId")
      res
    }
    
    //4,driverEndpoint收到UpdateBlockInfo的信息
    case _updateBlockInfo @ UpdateBlockInfo(
      blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) 
      context.reply(updateBlockInfo(
        blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize
      listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
    

    BlockManager之block数据写入到指定StroreLevel:
    不管是putArray还是putBytes,内部都是调用doPut来完成的,那么我们来看下doPut是如何完成数据的写入的,这个方法比较长我们分解解释;
    1.先判断该block和storeLevel是否为空;
    2.构建putBlockInfo,既即将put的数据对象;
    3.根据storeLevel判断使用哪种Blockstore,以及是否返回put操作的值;
    4.开始使用blockStore真正的put数据;
    5.如果使用的内存,则要将溢出的部分添加到updatedBlocks中;
    6.执行putBlockInfo.markReady(size),表示put数据结束,并唤醒其他线程;
    7.如果副本个数>1就开始异步复制数据到其他节点;

    private def doPut(
          blockId: BlockId,
          data: BlockValues,
          level: StorageLevel,
          tellMaster: Boolean = true,
          effectiveStorageLevel: Option[StorageLevel] = None)
        : Seq[(BlockId, BlockStatus)] = {
         
        //todo 1.判断该block和storeLevel是否为空;
        require(blockId != null, "BlockId is null")
        require(level != null && level.isValid, "StorageLevel is null or invalid")
        effectiveStorageLevel.foreach { level =>
          require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
        }
    
        //todo 2.构建putBlockInfo
        val putBlockInfo = {
          //todo 生成BlockInfo
          val tinfo = new BlockInfo(level, tellMaster)
          // Do atomically !
          //todo 将tinfo存入内存中
          val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
          //todo 判断该blockInfo是否存在
          if (oldBlockOpt.isDefined) {
            if (oldBlockOpt.get.waitForReady()) {
              logWarning(s"Block $blockId already exists on this machine; not re-adding it")
              return updatedBlocks
            }
            // TODO: So the block info exists - but previous attempt to load it (?) failed.
            // What do we do now ? Retry on it ?
            oldBlockOpt.get
          } else {
            tinfo
          }
        }
        
        //todo 将这个putBlockInfo加锁,防止其他线程操作该blockInfo
        putBlockInfo.synchronized {
          logTrace("Put for block %s took %s to get into synchronized block"
            .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
          var marked = false
          try {
            // returnValues - Whether to return the values put
            // blockStore - The type of storage to put these values into
            //todo 3.根据storeLevel判断使用哪种Blockstore,以及是否返回put操作的值
            val (returnValues, blockStore: BlockStore) = {
              if (putLevel.useMemory) {
                // Put it in memory first, even if it also has useDisk set to true;
                // We will drop it to disk later if the memory store can't hold it.
                (true, memoryStore)
              } else if (putLevel.useOffHeap) {
                // Use external block store
                (false, externalBlockStore)
              } else if (putLevel.useDisk) {
                // Don't get back the bytes from put unless we replicate them
                (putLevel.replication > 1, diskStore)
              } else {
                assert(putLevel == StorageLevel.NONE)
                throw new BlockException(
                  blockId, s"Attempted to put block $blockId without specifying storage level!")
              }
            }
            // Actually put the values
            //todo 4.开始使用blockStore真正的put数据
            val result = data match {
              case IteratorValues(iterator) =>
                blockStore.putIterator(blockId, iterator, putLevel, returnValues)
              case ArrayValues(array) =>
                blockStore.putArray(blockId, array, putLevel, returnValues)
              case ByteBufferValues(bytes) =>
                bytes.rewind()
                blockStore.putBytes(blockId, bytes, putLevel)
            }
            size = result.size
            result.data match {
              case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
              case Right (newBytes) => bytesAfterPut = newBytes
              case _ =>
            }
            // Keep track of which blocks are dropped from memory
            //todo 5.如果使用的内存,则要将溢出的部分添加到updatedBlocks中
            if (putLevel.useMemory) {
              result.droppedBlocks.foreach { updatedBlocks += _ }
            }
            //todo 获取当前block的状态
            val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
            if (putBlockStatus.storageLevel != StorageLevel.NONE) {
              // Now that the block is in either the memory, externalBlockStore, or disk store,
              // let other threads read it, and tell the master about it.
              marked = true
              //todo 6.表示该block已经put完成
              putBlockInfo.markReady(size)
              if (tellMaster) {
                //todo 向Master汇报block信息
                reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
              }
              updatedBlocks += ((blockId, putBlockStatus))
            }
          } finally {
            // If we failed in putting the block to memory/disk, notify other possible readers
            // that it has failed, and then remove it from the block info map.
            if (!marked) {
              // Note that the remove must happen before markFailure otherwise another thread
              // could've inserted a new BlockInfo before we remove it.
              blockInfo.remove(blockId)
              putBlockInfo.markFailure()
              logWarning(s"Putting block $blockId failed")
            }
          }
        }
        
        //todo 7.如果副本个数>1就开始异步复制数据
        if (putLevel.replication > 1) {
          data match {
            case ByteBufferValues(bytes) =>
              if (replicationFuture != null) {
                Await.ready(replicationFuture, Duration.Inf)
              }
            case _ =>
              val remoteStartTime = System.currentTimeMillis
              // Serialize the block if not already done
              if (bytesAfterPut == null) {
                if (valuesAfterPut == null) {
                  throw new SparkException(
                    "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
                }
                bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
              }
              //todo 将数复制到其他节点上
              replicate(blockId, bytesAfterPut, putLevel)
              logDebug("Put block %s remotely took %s"
                .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
          }
        }
        
        ......
    

    BlockManager之block数据的读取:
    获取数据时数据可能存在本地,也可能存在其他节点上,所以就有两个方法doGetLocal和doGetRmote,我们先看doGetLocal;
    1.做双重检测 检查block是否存在;
    2.如果有其他的线程正在往这个块中写数据,则向该block块改为只读状态;
    3.如果block使用的是memory,则使用memoryStore获取数据;
    4.如果block使用的是offheap,则使用externalBlockStore获取数据;
    5.如果block使用的是disk,则使用diskStore获取数据,在这里需要说一下,如果数据存放在Disk中,那么spark会再次判断该block是否能够存入磁盘中,如果可以则将block数据放入到memory中,以便再下一次使用的时候可以直接从memory中或者以提高效率;

    private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
      val info = blockInfo.get(blockId).orNull
      if (info != null) {
        info.synchronized {
         
          //todo 这里做了双重检测 检查block是否存在
          if (blockInfo.get(blockId).isEmpty) {
            logWarning(s"Block $blockId had been removed")
            return None
          }
          
         // If another thread is writing the block, wait for it to become ready.
         //todo 如果有其他的线程正在往这个块中写数据,则向该block块改为只读状态
         if (!info.waitForReady()) {
           // If we get here, the block write failed.
           logWarning(s"Block $blockId was marked as failure.")
           return None
         }
    
         //todo  如果block使用的是memory,则使用memoryStore获取数据
         if (level.useMemory) {
           logDebug(s"Getting block $blockId from memory")
           val result = if (asBlockResult) {
             memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
           } else {
             memoryStore.getBytes(blockId)
           }
           result match {
             case Some(values) =>
               return result
             case None =>
               logDebug(s"Block $blockId not found in memory")
           }
         } 
         
        //todo  如果block使用的是offheap,则使用externalBlockStore获取数据
        if (level.useOffHeap) {
          logDebug(s"Getting block $blockId from ExternalBlockStore")
          if (externalBlockStore.contains(blockId)) {
            val result = if (asBlockResult) {
              externalBlockStore.getValues(blockId)
                .map(new BlockResult(_, DataReadMethod.Memory, info.size))
            } else {
              externalBlockStore.getBytes(blockId)
            }
            result match {
              case Some(values) =>
                return result
              case None =>
                logDebug(s"Block $blockId not found in ExternalBlockStore")
            }
          }
        }
    
         //todo  如果block使用的是disk,则使用diskStore获取数据
         if (level.useDisk) {
           logDebug(s"Getting block $blockId from disk")
           val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
             case Some(b) => b
             case None =>
               throw new BlockException(
                 blockId, s"Block $blockId not found on disk, though it should be")
           }
           assert(0 == bytes.position())
           //todo 判断该block是否使用memory存储,如果不可以则直接返回数据
           if (!level.useMemory) {
             // If the block shouldn't be stored in memory, we can just return it
             if (asBlockResult) {
               return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
                 info.size))
             } else {
               return Some(bytes)
             }
           } else {
             // Otherwise, we also have to store something in the memory store
             if (!level.deserialized || !asBlockResult) {
               /* We'll store the bytes in memory if the block's storage level includes
                * "memory serialized", or if it should be cached as objects in memory
                * but we only requested its serialized bytes. */
               memoryStore.putBytes(blockId, bytes.limit, () => {
                 // https://issues.apache.org/jira/browse/SPARK-6076
                 // If the file size is bigger than the free memory, OOM will happen. So if we cannot
                 // put it into MemoryStore, copyForMemory should not be created. That's why this
                 // action is put into a `() => ByteBuffer` and created lazily.
                 val copyForMemory = ByteBuffer.allocate(bytes.limit)
                 copyForMemory.put(bytes)
               })
               bytes.rewind()
             }
             if (!asBlockResult) {
               return Some(bytes)
             } else {
               val values = dataDeserialize(blockId, bytes)
               if (level.deserialized) {
                 // Cache the values before returning them
                 //todo 如果允许使用memory存储,则将查询出来的数据写入到memory中
                 //todo 这样下次再查找该block数据直接从内存获取,以提高速度
                 val putResult = memoryStore.putIterator(
                   blockId, values, level, returnValues = true, allowPersistToDisk = false)
                 // The put may or may not have succeeded, depending on whether there was enough
                 // space to unroll the block. Either way, the put here should return an iterator.
                 putResult.data match {
                   case Left(it) =>
                     return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
                   case _ =>
                     // This only happens if we dropped the values back to disk (which is never)
                     throw new SparkException("Memory store did not return an iterator!")
                 }
               } else {
                 return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
               }
             }
           }
         }
         
         ......
    

    在doGetRemote方法中主要是使用blockTransferService从其他节点获取数据,如下代码所示:

    private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
      require(blockId != null, "BlockId is null")
      //todo 将存在该block的所有节点打散
      val locations = Random.shuffle(master.getLocations(blockId))
      var numFetchFailures = 0
      for (loc <- locations) {
        logDebug(s"Getting remote block $blockId from $loc")
        val data = try {
          //todo 使用blockTransferService从其他节点获取数据
          blockTransferService.fetchBlockSync(
            loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
        } catch {
          case NonFatal(e) =>
            numFetchFailures += 1
            if (numFetchFailures == locations.size) {
              // An exception is thrown while fetching this block from all locations
              throw new BlockFetchException(s"Failed to fetch block from" +
                s" ${locations.size} locations. Most recent failure cause:", e)
            } else {
              // This location failed, so we retry fetch from a different one by returning null here
              logWarning(s"Failed to fetch remote block $blockId " +
                s"from $loc (failed attempt $numFetchFailures)", e)
              null
            }
        }
        if (data != null) {
          if (asBlockResult) {
            return Some(new BlockResult(
              dataDeserialize(blockId, data),
              DataReadMethod.Network,
              data.limit()))
          } else {
            return Some(data)
          }
        }
        logDebug(s"The value of block $blockId is null")
      }
      logDebug(s"Block $blockId not found")
      None
    }
    

    BlockStore
    BlockStore的作用就是在存储block块信息时,客户端不需要关心Store的内部实现(DiskStore,MemoryStore,ExternalBlockStore)细节,一切交给blockManager这个对外的方法类通过指定存储级别去管理;
    MemoryStore是BlockManager中专门负责基于内存的数据存储和读写的类;
    DiskStore是BlockManager中专门负责基于磁盘的数据存储和读写的类;
    DiskBlockManager管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘的文件创建和读写;
    BlockTransferService负责不同机子上block通信;

    image

    至此BlockManager叙述完毕!内部还有像dropFromMemory这样比较重要的方法,在这里不过多叙述,感兴趣的话可以直接打开源码查看!

    相关文章

      网友评论

          本文标题:Spark源码之BlockManager

          本文链接:https://www.haomeiwen.com/subject/chhbzftx.html