美文网首页入坑spark
spark 中的事件总线 ListenerBus

spark 中的事件总线 ListenerBus

作者: 毕利格次_de99 | 来源:发表于2019-02-20 15:32 被阅读0次

    事件监听模式一般需要定义3种组件:事件对象,事件源,事件监听器。在spark里面事件监听由ListenerBus组件负责,ListenerBus是spark事件总线,spark中的事件监听由ListenerBus负责,其中事件源是spark里面定义的各种事件,事件对象也即是这个事件对应的Event,事件监听器就是负责处理这些事件的Listener

    一:ListenerBus的初始化

    ListenerBus的初始化是在SparkContext的初始化中完成的,SparkContext在初始化的时候需要将作业运行的环境以及各种相关组件加载好,比如说sparkEnv,mapStatusManager,BlockManager,ShuffleManager,MetricSystem,ListenerBus等组件,sparkContext是spark作业的运行的入口类,在sparkContext初始化的时候将这些组件都初始化好,所以ListenerBus作为事件总线,责任重大,当然也会在这里初始化

    private var _listenerBus: LiveListenerBus = _
     listenerBus = new LiveListenerBus(_conf)
    

    这里初始化的是LiveListenerBus对象,该对象是内部维护了两个队列queues和queuedEvents

     private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
    
      // Visible for testing.
      @volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
    

    ListenerBus事件监听就是通过这两个队列实现的,具体看看这两个队列是如何工作的

    先来看看ListenerBus的post方法,也就将事件对象发送到事件队列中

    def post(event: SparkListenerEvent): Unit = {
        if (stopped.get()) {
          return
        }
        //此处是spark的测量系统,该source是计数器,记录多少个事件发送了
        metrics.numEventsPosted.inc()
    
        // If the event buffer is null, it means the bus has been started and we can avoid
        // synchronization and post events directly to the queues. This should be the most
        // common case during the life of the bus.
        // queuedEvents如果为空的话,那么说明LiveListener已经启动过了,那么而直接掉好用postToQueues方法,该方法内部调用的AsyncEventQueue的post
        // 方法,将event添加到AsyncEventQueue内部维护的一个事件队列中(private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
        // conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) ))
        
        if (queuedEvents == null) {
          postToQueues(event)
          return
        }
    
        // Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
        // calling start() picks up the new event.
        //如果LiveListener没有启动过,在将事件天添加到queuedEvents队列中,等待start()将事件发送的监听器
        synchronized {
          if (!started.get()) {
            queuedEvents += event
            return
          }
        }
    
        // If the bus was already started when the check above was made, just post directly to the
        // queues.
        postToQueues(event)
      }
    

    上面的代码解释了事件对象添加到队列中的原理,下面再来看看,事件是如何发送的监听器的,该过程由LiveListenerBus中的start()方法触发

    /**
       * Start sending events to attached listeners.
       *
       * This first sends out all buffered events posted before this listener bus has started, then
       * listens for any additional events asynchronously while the listener bus is still running.
       * This should only be called once.
       *
       * @param sc Used to stop the SparkContext in case the listener thread dies.
       */
      def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
          //判断liveListenerBus是否已经启动过,启动了则抛出异常
        if (!started.compareAndSet(false, true)) {
          throw new IllegalStateException("LiveListenerBus already started.")
        }
    
        this.sparkContext = sc
          // 重点来了,此处先调用queues的start()方法,实则是启动了一个线程,
        queues.asScala.foreach { q =>
          q.start(sc)
          queuedEvents.foreach(q.post)
        }
        queuedEvents = null
        metricsSystem.registerSource(metrics)
      }
    

    该方法的作用一开始就说明了,将events发送到listeners。该方法内部首先调用了q.start()方法,实则是启动了一个线程去轮询的将event发送到监听器,queuedEvents.foreach(q.post) 和上面的postToQueues(event)的作用最后调用的都是post方法,将event加入到AsyncEventQueue内部的一个eventQueue中。

    下面重点看看AsyncEventQueue类 的start()方法是如何将event发送到listener中的

    /**
      * AsyncEventQueue 该类是SparkListenerBus的子类
    **/
    
    private[scheduler] def start(sc: SparkContext): Unit = {
        if (started.compareAndSet(false, true)) {
          this.sc = sc
          dispatchThread.start()
        } else {
          throw new IllegalStateException(s"$name already started!")
        }
      }
    
    private val dispatchThread = new Thread(s"spark-listener-group-$name") {
        setDaemon(true)
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          dispatch()
        }
      }
    
      private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
        // 从eventQueue中取出待处理的event,并通过postToAll将事件发送到listener中
        var next: SparkListenerEvent = eventQueue.take()
        while (next != POISON_PILL) {
          val ctx = processingTime.time()
          try {
            // 重点方法  
            super.postToAll(next)
          } finally {
            ctx.stop()
          }
          eventCount.decrementAndGet()
          next = eventQueue.take()
        }
        eventCount.decrementAndGet()
      }
    

    再来看看super.postToAll

    /**
      * SparkListenerBus
      *
    **/
    protected override def doPostEvent(
          listener: SparkListenerInterface,
          event: SparkListenerEvent): Unit = {
        event match {
          case stageSubmitted: SparkListenerStageSubmitted =>
            listener.onStageSubmitted(stageSubmitted)
          case stageCompleted: SparkListenerStageCompleted =>
            listener.onStageCompleted(stageCompleted)
          case jobStart: SparkListenerJobStart =>
            listener.onJobStart(jobStart)
          case jobEnd: SparkListenerJobEnd =>
            listener.onJobEnd(jobEnd)
          case taskStart: SparkListenerTaskStart =>
            listener.onTaskStart(taskStart)
          case taskGettingResult: SparkListenerTaskGettingResult =>
            listener.onTaskGettingResult(taskGettingResult)
          case taskEnd: SparkListenerTaskEnd =>
            listener.onTaskEnd(taskEnd)
          case environmentUpdate: SparkListenerEnvironmentUpdate =>
            listener.onEnvironmentUpdate(environmentUpdate)
          case blockManagerAdded: SparkListenerBlockManagerAdded =>
            listener.onBlockManagerAdded(blockManagerAdded)
          case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
            listener.onBlockManagerRemoved(blockManagerRemoved)
          case unpersistRDD: SparkListenerUnpersistRDD =>
            listener.onUnpersistRDD(unpersistRDD)
          case applicationStart: SparkListenerApplicationStart =>
            listener.onApplicationStart(applicationStart)
          case applicationEnd: SparkListenerApplicationEnd =>
            listener.onApplicationEnd(applicationEnd)
          case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
            listener.onExecutorMetricsUpdate(metricsUpdate)
          case executorAdded: SparkListenerExecutorAdded =>
            listener.onExecutorAdded(executorAdded)
          case executorRemoved: SparkListenerExecutorRemoved =>
            listener.onExecutorRemoved(executorRemoved)
          case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
            listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
          case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
            listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
          case executorBlacklisted: SparkListenerExecutorBlacklisted =>
            listener.onExecutorBlacklisted(executorBlacklisted)
          case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
            listener.onExecutorUnblacklisted(executorUnblacklisted)
          case nodeBlacklisted: SparkListenerNodeBlacklisted =>
            listener.onNodeBlacklisted(nodeBlacklisted)
          case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
            listener.onNodeUnblacklisted(nodeUnblacklisted)
          case blockUpdated: SparkListenerBlockUpdated =>
            listener.onBlockUpdated(blockUpdated)
          case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
            listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
          case _ => listener.onOtherEvent(event)
        }
    

    事件在SparkListenerBus中被处理掉,整个事件总线的处理流程完成。再来看看spark中事件总线涉及到类的关系图

    listenerBus事件总线主要涉及到上面的4个类,补充一点添加/移除监听器是用ListenerBus这个抽象类提供的addListener和removeListener来完成的,整个事件总线工作流程分析完成,再次说明,设计一个事件监听模型,最少要定义清楚3个组件:事件源,事件对象,事件监听器,最后就是怎么样将事件对象发送到事件监听器中处理,可以参照spark里面的设计,将事件对象缓存到一个队列中,然后再由线程去轮询这个队列完成事件对象到事件监听的映射

    相关文章

      网友评论

        本文标题:spark 中的事件总线 ListenerBus

        本文链接:https://www.haomeiwen.com/subject/qmshyqtx.html