美文网首页Spark
Spark-Broadcast的生命周期

Spark-Broadcast的生命周期

作者: 蠟筆小噺没有烦恼 | 来源:发表于2021-03-22 22:24 被阅读0次

    事情起因是一位同事写的SparkStreaming程序,总会出现部分Executor上请求broadcast不成功的错误,鉴于此,我专门走读了一下broadcast的相关代码,尝试找到原因

    主要计算流程是,一个独立的线程在dirver端扫描HDFS,如果配置文件修改了,那就读入并创建broadcast,executor使用该broadcast处理接下来的流处理请求。类似于ip黑名单,但是黑名单是变化的,每隔一段时间需要生成新的广播变量。

    1 Broadcast简介

    broadcast-广播变量,常用于MapJoin及一些配置文件的全局传递,使用方式很简单:

    val blackIp=Set(ip1,ip2...)
    #sc.broadcast创建广播变量
    val blackIpBC=sc.broadcast(blackIp) 
    # 广播变量.value在task内获取广播变量的实际内容
    rdd.filter(row=>!blackIpBC.value.contains(row.ip))
    

    1.1 广播变量的优势

    为什么不直接使用blackIp,非要包装一层广播变量呢?

    事实上,广播变量在使用的时候,是被拉取到Executor上的BlockManager中,只需要第一个task使用的时候拉取一次,之后其他task使用就可以复用blockManager中的变量,不需要重新拉取,也不需要在task中保存这个数据结构。

    另外,广播变量在拉取的时候是基于Torrent协议的,即executor可以从其他executor上拉取该广播变量。如果不使用广播变量,那么所有请求都需要从driver进行,数据量大的时候,driver会表示很有压力。

    说到Torrent协议,其实很多下载器以及媒体播放器都是基于Torrent的,比如经常能看到后台的迅雷或者腾讯视频、爱奇艺客户端在上传数据,实际上这个时候我们的电脑也相当于一个中间server,给其他用户传资源呢。如果你的电脑性能不行或网络比较烂,记得要手动限速一下。

    2 广播变量的创建过程

    2.1 driver端做了什么?

    sc.braodcast(value) 在driver端做了哪些操作?能确保executor端能访问到这个变量呢?

    2.1.1 SparkContext的broadcast方法

    SparkContext.broadcast代码如下:

      /**
       * Broadcast a read-only variable to the cluster, returning a
       * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
       * The variable will be sent to each cluster only once.
       *
       * @param value value to broadcast to the Spark nodes
       * @return `Broadcast` object, a read-only variable cached on each machine
       */
      def broadcast[T: ClassTag](value: T): Broadcast[T] = {
        assertNotStopped()
        require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
          "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
        val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
        val callSite = getCallSite
        logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
        cleaner.foreach(_.registerBroadcastForCleanup(bc))
        bc
      }
    
    • 首先是不允许RDD作为广播变量
    • 调用BroadcastManger.newBroadcast创建广播变量
    • getCallSite是注册相关堆栈信息,用于做跟踪,和具体逻辑没啥关系,不需要关注
    • cleaner.foreach这个部分很重要,注册了一个需要回收的句柄

    2.1.2 BroadcastManager类

    BroadcastManager是在SparkEnv中初始化的

    private[spark] class BroadcastManager(val isDriver: Boolean,conf: SparkConf,securityManager: SecurityManager)
      extends Logging {
    
      private var initialized = false
      private var broadcastFactory: BroadcastFactory = null
    
      initialize()
    
      // Called by SparkContext or Executor before using Broadcast
      private def initialize(): Unit = {synchronized {
          if (!initialized) {
            broadcastFactory = new TorrentBroadcastFactory
            broadcastFactory.initialize(isDriver, conf, securityManager)
            initialized = true
          } }}
    
      def stop(): Unit = {broadcastFactory.stop()}
    
      private val nextBroadcastId = new AtomicLong(0)
    
      private[broadcast] val cachedValues = Collections.synchronizedMap(new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)            .asInstanceOf[java.util.Map[Any, Any]])
    
      def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
        val bid = nextBroadcastId.getAndIncrement()
        value_ match {
          case pb: PythonBroadcast => pb.setBroadcastId(bid)
          case _ => // do nothing
        }
        broadcastFactory.newBroadcast[T](value_, isLocal, bid)
      }
    
      def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
        broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
      }
    }
    

    构造方法

    BroadcastManager在构造时有三个参数,分别是isDriver(是否为Driver节点)、conf(对应的SparkConf配置)、securityManager(对应的SecurityManager

    属性成员

    BroadcastManager内有四个属性成员:

    • initialized表示BroadcastManager是否已经初始化完成。
    • broadcastFactory持有广播工厂的实例(类型是BroadcastFactory特征的实现类)。
    • nextBroadcastId表示下一个广播变量的唯一标识(AtomicLong类型的)。
    • cachedValues用来缓存已广播出去的变量。它属于ReferenceMap类型,是apache-commons提供的一个弱引用映射数据结构。与我们常见的各种Map不同,它的键值对有可能会在GC过程中被回收。

    对方提供的方法

    提供了两方法,最终都是调用BroadcastFactory的同名方法。

    • newBroadcast方法:创建广播变量

    • unbroadcast方法:注销广播变量

    实际上,BroadcastFactory是一个trait,只有TorrentBroadcastFactory一个实现类。

    2.1.3 TorrentBroadcastFactory类

    private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
    
      override def initialize(isDriver: Boolean, conf: SparkConf,
          securityMgr: SecurityManager): Unit = { }
    
      override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
        new TorrentBroadcast[T](value_, id)
      }
    
      override def stop(): Unit = { }
    
      /**
       * Remove all persisted state associated with the torrent broadcast with the given ID.
       * @param removeFromDriver Whether to remove state from the driver.
       * @param blocking Whether to block until unbroadcasted
       */
      override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
        TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
      }
    }
    

    创建广播变量就是初始化了一个TorrentBroadcast对象,并且isLocal这个变量是没有被使用的,它代表了--master 是否是local的选项;

    卸载广播变量直接调用了TorrentBroadcast.unpersist方法;

    stop什么都不做。

    2.1.4 TorrentBroadcast

    继承于Broadcast,这个类代码很多,我们分开来说,

    属性成员

      /**
       * 这是个软连接,方便之后垃圾回收同步删除
       */
      @transient private var _value: SoftReference[T] = _
      @transient private var compressionCodec: Option[CompressionCodec] = _
      @transient private var blockSize: Int = _
    
      private def setConf(conf: SparkConf): Unit = {
        compressionCodec = if (conf.get(config.BROADCAST_COMPRESS)) {
          Some(CompressionCodec.createCodec(conf))
        } else {
          None
        }
        // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided
        blockSize = conf.get(config.BROADCAST_BLOCKSIZE).toInt * 1024
        checksumEnabled = conf.get(config.BROADCAST_CHECKSUM)
      }
      setConf(SparkEnv.get.conf)
    
      private val broadcastId = BroadcastBlockId(id)
    
      /** Total number of blocks this broadcast variable contains. */
      private val numBlocks: Int = writeBlocks(obj)
    
      /** Whether to generate checksum for blocks or not. */
      private var checksumEnabled: Boolean = false
      /** The checksum for all the blocks. */
      private var checksums: Array[Int] = _
    
    • _value:广播块的具体数据。调用readBroadcastBlock()方法获取数据进行数据拉取,在driver端,如果需要访问这个值,需要通过懒加载方式读取blockManager。另外_value是一个软连接,方便之后在GC同时进行回收
    • compressionCodec:广播块的压缩编解码逻辑。当配置项spark.broadcast.compress为true时,会启用压缩。
    • blockSize:广播块的大小。由spark.broadcast.blockSize配置项来控制,默认值4MB。
    • broadcastId:广播变量的ID。BroadcastBlockId是个结构非常简单的case class,每产生一个新的广播变量就会自增。
    • numBlocks:该广播变量包含的块数量。此在TorrentBroadcast构造时就会直接调用writeBlocks()方法。
    • checksumEnabled:是否允许对广播块计算校验值,由spark.broadcast.checksum配置项控制,默认值true。
    • checksums:广播块的校验值。

    另外还调用了setConf方法进行部分变量的初始化;writeBlocks(obj)进行了实际的数据写入。

    writeBlocks方法

      private def writeBlocks(value: T): Int = {
        import StorageLevel._
        // Store a copy of the broadcast variable in the driver so that tasks run on the driver
        // do not create a duplicate copy of the broadcast variable's value.
        val blockManager = SparkEnv.get.blockManager
        if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
          throw new SparkException(s"Failed to store $broadcastId in BlockManager")
        }
        val blocks =
          TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
        if (checksumEnabled) {
          checksums = new Array[Int](blocks.length)
        }
        blocks.zipWithIndex.foreach { case (block, i) =>
          if (checksumEnabled) checksums(i) = calcChecksum(block)
          val pieceId = BroadcastBlockId(id, "piece" + i)
          val bytes = new ChunkedByteBuffer(block.duplicate())
          if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
            throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
          }
        }
        blocks.length
      }
    
    • 调用blockManager.putSingle方法,将变量作为一个独立对象写入到BlockManager中,putSingle方法这里不做赘述,并且使用MEMORY_AND_DISK的方式,占用Storage部分内存,如果内存不足,会进行写磁盘操作。
    • 调用blockifyObject()方法将广播数据转化为块,即Spark存储的基本单元。使用的序列化器为SparkEnv中初始化的JavaSerializer。
    • 如果校验值开关有效,就用calcChecksum()方法为每个块计算校验值。
    • 为广播数据切分成的每个块(称为piece)都生成一个带"piece"的广播ID,调用BlockManager.putBytes()方法将各个块以MEMORY_AND_DISK_SER模式序列化保存到BlockManager中。
    • 最终返回块的计数值。

    上述流程就是在driver端进行广播变量的创建过程,需要注意的是,广播变量被存储了两次,一次是Memory+Disk作为单个Java对象存储,一次是切分块后Memory+Disk并且序列化作为二进制存储。

    2.2 executor端做了什么

    在调用sc.broadcast之后,会返回一个Broadcast对象,之后在rdd算子内调用broadcast对象.value就可以拿到这个值,具体发生了什么呢

    2.2.1 Broadcast的value方法

    value方法调用了assertValid,先确保该broadcast还没有被卸载掉

     @volatile private var _isValid = true
    def value: T = {
      assertValid()
      getValue()
    }
    /** Check if this broadcast is valid. If not valid, exception is thrown. */
    protected def assertValid(): Unit = {
      if (!_isValid) throw new SparkException("Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))
    }
    

    getValue是一个抽象方法,在TorrentBroadcast中做了具体实现。

    2.2.2 TorrentBroadcast的getValue方法

    override protected def getValue() = synchronized {
      val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
      if (memoized != null) {
        memoized
      } else {
        val newlyRead = readBroadcastBlock()
        _value = new SoftReference[T](newlyRead)
        newlyRead
      }
    }
    

    如果_value不存在,则说明executor还没有读取过这个广播变量,那么调用readBroadcastBlock读取数据,同时为_value创建软连接,指向读取过来的广播变量。

    2.2.3 TorrentBroadcast的readBroadcastBlock方法

      private def readBroadcastBlock(): T = Utils.tryOrIOException {
        TorrentBroadcast.torrentBroadcastLock.withLock(broadcastId) {
          // As we only lock based on `broadcastId`, whenever using `broadcastCache`, we should only
          // touch `broadcastId`.
          val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
    
          Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
            setConf(SparkEnv.get.conf)
            val blockManager = SparkEnv.get.blockManager
            blockManager.getLocalValues(broadcastId) match {
              case Some(blockResult) =>
                if (blockResult.data.hasNext) {
                  val x = blockResult.data.next().asInstanceOf[T]
                  releaseBlockManagerLock(broadcastId)
                  if (x != null) broadcastCache.put(broadcastId, x)
                  x
                } else {
                  throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
                }
              case None =>
                val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)
                logInfo(s"Started reading broadcast variable $id with $numBlocks pieces (estimated total size $estimatedTotalSize)")
                val startTimeNs = System.nanoTime()
                val blocks = readBlocks()
                logInfo(s"Reading broadcast variable $id took ${Utils.getUsedTimeNs(startTimeNs)}")
                try {
                  val obj = TorrentBroadcast.unBlockifyObject[T](
                    blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
                  // Store the merged copy in BlockManager so other tasks on this executor don't need to re-fetch it.
                  val storageLevel = StorageLevel.MEMORY_AND_DISK
                  if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
                    throw new SparkException(s"Failed to store $broadcastId in BlockManager")
                  }
                  if (obj != null)broadcastCache.put(broadcastId, obj)
                  obj
                } finally {
                  blocks.foreach(_.dispose())
                }
            }
          }
        }
      }
    
    • 获取BlockManager实例,调用其getLocalValues()方法将之前写入的广播数据对象取出。
    • 如果能够直接取得广播数据,就调用releaseBlockManagerLock()方法【实际上对应BlockManager.releaseLock(),又对应Object.notifyAll()】解开当前块的锁。这个锁用来保证块读写的互斥性。
    • 如果不能直接取得广播数据,说明数据都已经序列化,并且有可能不在本地存储。此时调用readBlocks()方法从本地和远端同时获取块,然后调用unBlockifyObject()方法将块转换回广播数据的对象。
    • 再次调用BlockManager.putSingle()方法将广播数据作为单个对象写入本地存储,再将其加入广播缓存Map中,下次读取时就不用大费周章了。

    2.2.4 TorrentBroadcast的readBlocks方法

      private def readBlocks(): Array[BlockData] = {
        val blocks = new Array[BlockData](numBlocks)
        val bm = SparkEnv.get.blockManager
    
        for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
          val pieceId = BroadcastBlockId(id, "piece" + pid)
          logDebug(s"Reading piece $pieceId of $broadcastId")
          bm.getLocalBytes(pieceId) match {
            case Some(block) =>
              blocks(pid) = block
              releaseBlockManagerLock(pieceId)
            case None =>
              bm.getRemoteBytes(pieceId) match {
                case Some(b) =>
                  if (checksumEnabled) {
                    val sum = calcChecksum(b.chunks(0))
                    if (sum != checksums(pid)) {
                      throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
                        s" $sum != ${checksums(pid)}")
                    }
                  }
                  // We found the block from remote executors/driver's BlockManager, so put the block
                  // in this executor's BlockManager.
                  if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
                    throw new SparkException(
                      s"Failed to store $pieceId of $broadcastId in local BlockManager")
                  }
                  blocks(pid) = new ByteBufferBlockData(b, true)
                case None =>
                  throw new SparkException(s"Failed to get $pieceId of $broadcastId")
              }
          }
        }
        blocks
      }
    

    该方法会首先对所有广播数据的piece进行打散,然后对打散之后的每个piece执行以下步骤:

    • 调用BlockManager.getLocalBytes()方法,从本地获取序列化的广播数据块。将获取到的块放入对应下标的位置,并释放该块的锁。
    • 如果本地没有广播数据,就调用BlockManager.getRemoteBytes()方法从远端(其他Executor或者Driver)获取广播数据块。
    • 对远程获取的块计算校验值,并与之前写入时计算的校验值比对。如果不同,说明传输发生错误,抛异常出去。
    • 若一切正常,调用BlockManager.putBytes()方法,将各个块写入MemoryStore(内存)或DiskStore(磁盘),并将其放入对应下标的位置。最终返回所有读取的块。

    3 广播变量的清理

    广播变量什么时候可以做清理呢?driver端和executor端的清理机制有什么不一样的地方呢?

    3.1 主动清理

    通过调用upersist方法即可手动清理

    广播变量.unpersist() #只产出executor上的广播变量
    广播变量.doDestroy() #同时删除driver和executor的广播变量
    

    注意,目前unpersist的具体实现在TorrentBroadcast中,只能清理掉executor端的广播变量。

    如果想清理掉driver端的广播变量,需要调用doDestroy方法。

    3.1.1 Broadcast的unpersist方法

    在Broadcast类中,有两个重载的unpersist方法,blocking代表是否在unpersist中加锁,直到unpersist完成,相当于异步执行还是同步执行,默认blocking是false,相当于异步。

    def unpersist(): Unit = {
      unpersist(blocking = false)
    }
    
    def unpersist(blocking: Boolean): Unit = {
      assertValid()
      doUnpersist(blocking)
    }
    
    protected def doUnpersist(blocking: Boolean): Unit
    

    最终调用了doUnpersist方法,是一个抽象方法,目前只有TorrentBroadcast中有具体实现。

    3.1.2 TorrentBrodcast的doUnpersist方法

    override protected def doUnpersist(blocking: Boolean): Unit = {
      TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
    }
    
    override protected def doDestroy(blocking: Boolean): Unit = {
    TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
    }
    
    def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
      logDebug(s"Unpersisting TorrentBroadcast $id")
      SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
    }  
    

    doUnpersist方法用于清理executor端的广播变量,doDestroy方法用于清理driver端和executor端的广播变量,他们都调用了unpersist方法,unpersist方法是实际做清理的部分,它有三个参数:

    • id :广播变量id
    • removeFromDriver :是否清理掉driver端的广播变量
    • blocking : 是否采用同步机制加锁清理

    进一步调用了BlockManagerMaster类(SparkEnv.get.blockManager.master)的removeBroadcast方法。

    3.1.3 BlockManagerMaster的removeBroadcast方法

    具体的做法就是driver端先向BlockManagerMaster(在Driver端)发送一条rpc请求,请求删除指定broadcast的消息,BlockManagerMaster再向所有BlockManagerSlave(在Executor端)发送删除broadcast的请求,中间一共有两次RPC请求,。

    /** Remove all blocks belonging to the given broadcast. */
    def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit = {
      val future = driverEndpoint.askSync[Future[Seq[Int]]](
        RemoveBroadcast(broadcastId, removeFromMaster))
      future.failed.foreach(e =>
        logWarning(s"Failed to remove broadcast $broadcastId" +
          s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
      )(ThreadUtils.sameThread)
      if (blocking) {
        timeout.awaitResult(future)
      }
    }
    

    如果blocking=true,那么就需要等待这个请求被所有executor处理完,才能返回。

    具体消息总线的事件传输机制这里不深入讲解,最终这条rpc请求会传送到BlockManagerMasterEndpoint,被receiveAndReply方法处理。

    3.1.4 BlockManagerMasterEndpoint端对removeBroadcast的处理

      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      ...
      case RemoveBroadcast(broadcastId, removeFromDriver) => context.reply(removeBroadcast(broadcastId, removeFromDriver))
      ... }
    

    进一步调用BlockManagerMasterEndpoint类的removeBroadcast方法,

    private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
      val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
      val requiredBlockManagers = blockManagerInfo.values.filter { info =>
        removeFromDriver || !info.blockManagerId.isDriver
      }
      val futures = requiredBlockManagers.map { bm =>
        bm.slaveEndpoint.ask[Int](removeMsg).recover {
          case e: IOException =>
            logWarning(s"Error trying to remove broadcast $broadcastId from block manager " +
              s"${bm.blockManagerId}", e)
            0 // zero blocks were removed
        }
      }.toSeq
    
      Future.sequence(futures)
    }
    

    首先会将删除broadcast的请求再封装发送给所有的BlockManagerSlaveEndpointr。具体操作流程是

    • 封装一个RemoveBroadcast的case类
    • 过滤出所有非driver的BlockManager,如果removeFromDriver为true,那么driver的BlockManger会被保留
    • 向所有BlockManagerEndpoint发送removeBroadcast的RPC请求。

    3.1.5 Executor端对删除Broadcast的操作

    BlockManagerSlaveEndpoint的receiveAndReply方法

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    ...
        case RemoveBroadcast(broadcastId, _) =>doAsync[Int]("removing broadcast " + broadcastId, context) {blockManager.removeBroadcast(broadcastId, tellMaster = true)
    ...
      }
    

    executor端接收到removeBroadcast的请求后,会尝试调用BlockManager.removeBroadcast方法

    3.1.6 BlockManager的removeBroadcast方法

    这是最终的操作了,具体就是遍历BlockManager上所有的BlockId,如果是属于该Broadcast,则调用removeBlock方法删除具体block块,最终返回删除掉block块的数量。

    def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
      logDebug(s"Removing broadcast $broadcastId")
      val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
        case bid @ BroadcastBlockId(`broadcastId`, _) => bid
      }
      blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
      blocksToRemove.size
    }
    

    removeBlock也是BlockManager的方法:

    def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
      logDebug(s"Removing block $blockId")
      blockInfoManager.lockForWriting(blockId) match {
        case None =>
          // The block has already been removed; do nothing.
          logWarning(s"Asked to remove block $blockId, which does not exist")
        case Some(info) =>
          removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
          addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
      }
    }
    

    具体流程是:

    • 获取block的元数据信息,这个过程要对block加上写锁,防止其他线程同时修改数据。
    • 如果获取到了元数据,那么调用removeBlockInternal将其删除
    • 更新block的相关Metrics信息。

    BlockManager的removeBlockInternal就不详细介绍了,具体就是尝试删除内存及磁盘的Block数据,最后删除掉BlockInfoManager(保存了该BlockMNager中所有的block信息)的该广播变量的blockId。

    3.1.7 总结

    这就是手动调用广播变量删除的方法了,doDestroy会删除driver及executor的广播变量,而unpersist只会删除executor上的广播变量。

    中间涉及到两次RPC请求,分别是driver向同在driver上的BlockManagerMaster发送请求,以及BlockManagerMaster向BlockManagerSlave发送请求。

    最终的删除是调用BlockManager的removeBlock来删除的,其实Spark中不管是RDD、Shuffle数据、最中都是以Block的形式做管理的,整体代码逻辑非常清晰,如果想深入理解Spark的设计,一定要把BlockManager这块搞清楚。

    3.2 自动清理:用于存储MapOutputStatus

    上文讲到的doDestroy可以在用户代码中显式调用。除此之外,它还被自己的destroy()方法做了调用,而它又被MapOutputTracker类的invalidateSerializedMapOutputStatusCache做了调用,进行driver及executor所有的广播变量删除。

    def invalidateSerializedMapOutputStatusCache(): Unit = withWriteLock {
      if (cachedSerializedBroadcast != null) {
        // Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444)
        Utils.tryLogNonFatalError {
          // Use `blocking = false` so that this operation doesn't hang while trying to send cleanup
          // RPCs to dead executors.
          cachedSerializedBroadcast.destroy()
        }
        cachedSerializedBroadcast = null
      }
      cachedSerializedMapStatus = null
    }
    

    cachedSerializedBroadcast的是一个存储了二进制数组的广播变量,如果它!=null,那么就回触发destroy的清理。

    事实上,这个方法在是在Shuffle完成之后才进行调用的,这个广播变量存储的是Map端已经完成的Task的id、Shuffle数据存放位置等信息。用于传输给driver和下游的reduce端进行数据拉取和任务调度等操作。

    说白了,Shuffle这部分的广播变量是我们用于自己触碰不到的,只要知道,shuffle的MaoOutputStatus相关信息是用广播来发送的即可。

    3.3 自动清理:ContextCleaner

    这个机制是默认开启的,可以自动回收变为弱引用的RDD、Shuffle、广播变量、累加器和checkpoint,Spark会创建一个定时线程,每隔一定时间,就调用System.gc()来回收变为弱引用的5种数据类型。默认30min,使用如下参数进行设置:

    spark.cleaner.periodicGC.interval #默认30min
    

    如果日志中出现了Spark Context Cleaner,那么证明Spark已经自动开启了clean操作。

    开头[2.1.1 SparkContext的broadcast方法]部分代码中,有一行:

    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    

    用cleaner来注册了一个广播变量的cleanup,注意这里cleaner不是什么数组或者链表来才调用foreach方法,而是一个Option[ContextCleaner],没想到吧,Option也能调用foreach方法,如果cleaner为None,那么就跳过了registerBroadcastForCleanup方法。

    3.3.1 ContextCleaner的广播变量注册

    /** Register a Broadcast for cleanup when it is garbage collected. */
    def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = {
      registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
    }
    
     private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
        referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
    }
    
     private class CleanupTaskWeakReference( val task: CleanupTaskreferent: AnyRef, referenceQueue: ReferenceQueue[AnyRef])
    extends WeakReference(referent, referenceQueue)
    

    简单来说,就是封装了一个CleanupTaskWeakReference弱引用类,与引用队列referenceQueue联合使用,并添加到referenceBuffer中(ConcurrentHashMap),referenceBuffer主要作用保存CleanupTaskWeakReference弱引用,确保在引用队列没处理前,弱引用不会被垃圾回收。当这个广播变量在可达性分析中变成弱引用时,就可以进行回收了。

    3.3.2 ContextCleaner的清理

    每隔30min,会调用keepCleaning方法,如果广播变量已经被引用队列处理了,就可以调用doCleanupBroadcast进行清理。

    private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
      while (!stopped) {
        try {
          val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
            .map(_.asInstanceOf[CleanupTaskWeakReference])
          // Synchronize here to avoid being interrupted on stop()
          synchronized {
            reference.foreach { ref =>
              logDebug("Got cleaning task " + ref.task)
              referenceBuffer.remove(ref)
              ref.task match {
                case CleanRDD(rddId) =>
                  doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
                case CleanShuffle(shuffleId) =>
                  doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
                case CleanBroadcast(broadcastId) =>
                  doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
                case CleanAccum(accId) =>
                  doCleanupAccum(accId, blocking = blockOnCleanupTasks)
                case CleanCheckpoint(rddId) =>
                  doCleanCheckpoint(rddId)
              }
            }
          }
        } catch {
          case ie: InterruptedException if stopped => // ignore
          case e: Exception => logError("Error in cleaning thread", e)
        }
      }
    }
    

    从referenceQueue取出所有变需要清理的弱引用,将其一一删除。

    doCleanupBroadcast调用了BroadcastManager的unbroadcast方法:

    override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
      TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
    }
    

    最终又调用了unpersist方法,注意这里removeFromDriver是=true的,相当于调用destroy方法,将广播变量从driver以及executor全部删除。

    3.4 自动清理:软引用

    这种方式也是通过GC来清除软引用的方式,只是清理实际的广播变量内对应的对象,而广播变量依旧在BlockManager中,如果之后需要再使用该value,可以从BlockManger中重新读取广播变量对应的数据。

    在调用broadcast.value时,会进一步调用TorrentBroadcast的getValue方法:

    override protected def getValue() = synchronized {
      val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
      if (memoized != null) {
     memoized
      } else {
     val newlyRead = readBroadcastBlock()
     _value = new SoftReference[T](newlyRead)
     newlyRead
      }
    }
    

    如果是第一次访问,那么会走else块的代码,为_value创建一个软引用。软引用在遇到GC且内存不足的时候会被删除。

    那么就是说,如果广播变量在一个executor中被访问过,且遇到一次内存不足导致的GC时,就会删除该对象。

    在driver和executor端都是如此,但并不会影响到已经存储在BlockManger中的广播变量数据。

    所以,这里还可以引入一个优化点,在使用广播变量的时候,一个partition尽量只调用一次.value方法:

    rdd.mapPartition(iter=>{
      val blackIps=blackIpsBC.value
      iter.filter(t=>!blackIps.contains(t.ip))
     })
    

    这种做法,可以跳过每条数据都需要做广播变量是否存在的判断,是比较好的编码习惯。

    收工!


    相关文章

      网友评论

        本文标题:Spark-Broadcast的生命周期

        本文链接:https://www.haomeiwen.com/subject/jfnthltx.html