美文网首页
3.2 LiveListenerBus - 监听器消息队列

3.2 LiveListenerBus - 监听器消息队列

作者: GongMeng | 来源:发表于2018-11-14 15:20 被阅读0次

    1. 结构概要

    Listener Bus

    和其它的监听器方法一样, 实现一个总线, DAGScheduler通过监听总线上的多个事件来得知当前的任务进度, 继而规划下一步的安排.

    LiveListenerBus关注的是Application相关的Event, 包括APP的运行状态, JOB的状态等等
    ListenerBus内部使用 java.util.concurrent.LinkedBlockingQueue 默认长度10000 来实现一个消息队列. 它本身是AsynchronousListenerBus的一个具体实现

    SparkListenerBus使用一个Thread Poll Message from Bus

    这个Bus也是最重要的bus, 挂载在sparkContext里为后续一系列的service提供服务

    2. 部分代码解读

    
    /**
     * Asynchronously passes events to registered listeners.
     *
     * Until `start()` is called, all posted events are only buffered. Only after this listener bus
     * has started will events be actually propagated to all attached listeners. This listener bus
     * is stopped when `stop()` is called, and it will drop further events after stopping.
     *
     * @param name name of the listener bus, will be the name of the listener thread.
     * @tparam L type of listener
     * @tparam E type of event
     */
    private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: String)
      extends ListenerBus[L, E] {
    
      self =>
    
      private var sparkContext: SparkContext = null
    
      /* Cap the capacity of the event queue so we get an explicit error (rather than
       * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
      private val EVENT_QUEUE_CAPACITY = 10000
    
      // 这是一个线程安全的链表
      private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
    
      // Indicate if `start()` is called
      private val started = new AtomicBoolean(false)
    
      // Indicate if `stop()` is called
      private val stopped = new AtomicBoolean(false)
    
      /** A counter for dropped events. It will be reset every time we log it. */
      private val droppedEventsCounter = new AtomicLong(0L)
    
      /** When `droppedEventsCounter` was logged last time in milliseconds. */
      @volatile private var lastReportTimestamp = 0L
    
      // Indicate if we are processing some event
      // Guarded by `self`
      private var processingEvent = false
    
      // A counter that represents the number of events produced and consumed in the queue
      private val eventLock = new Semaphore(0)
    
     // 这个内部Thread是用来从队里里拉信息处理的, 所以只有队列里有信息的时候, 也就是信号量>1的时候才能进行处理
      private val listenerThread = new Thread(name) {
       // 这是一个守护进程哦!
        setDaemon(true)
        override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
          AsynchronousListenerBus.withinListenerThread.withValue(true) {
            while (true) {
              //  队列里有信息的时候就会得到信号量, 另外一种情况是bus被关闭的时候, 也会取得信号量好把剩下的event都处理掉
              eventLock.acquire()
              // 加标志位, 告诉大家我们开工了
              self.synchronized {
                processingEvent = true
              }
              try {
                val event = eventQueue.poll
               // 各种错误处理
               // 把消息发送给所有的监听者
                postToAll(event)
              } finally {
                // 加标志位, 告诉大家工作搞定
                self.synchronized {
                  processingEvent = false
                }
              }
            }
          }
        }
      }
    
      /**
       * Start sending events to attached listeners.
       *
       * This first sends out all buffered events posted before this listener bus has started, then
       * listens for any additional events asynchronously while the listener bus is still running.
       * This should only be called once.
       *
       * @param sc Used to stop the SparkContext in case the listener thread dies.
       */
      def start(sc: SparkContext) {
        // 这里用CAS保证了我们仅仅启动一个LisenerThread进程来处理这些消息
        if (started.compareAndSet(false, true)) {
          sparkContext = sc
          listenerThread.start()
        } else {
          throw new IllegalStateException(s"$name already started!")
        }
      }
     
      //  添加消息到等待队列中
      def post(event: E) {
        // 各种错误检查后, 把event加入到队列中
        val eventAdded = eventQueue.offer(event)
    
        // 消息添加成功, 就可以释放信号量
        if (eventAdded) {
          eventLock.release()
        } else {
          onDropEvent(event)
          droppedEventsCounter.incrementAndGet()
        }
        //  如果队列满了, 或者其它什么高并发的bug发生了, 消息可能失败, 这个时候就要用log记录. 这里为了防止日志爆炸, 设定上是60秒打印一次汇总.
        val droppedEvents = droppedEventsCounter.get
        if (droppedEvents > 0) {
          // Don't log too frequently
          if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
            // There may be multiple threads trying to decrease droppedEventsCounter.
            // Use "compareAndSet" to make sure only one thread can win.
            // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
            // then that thread will update it.
            if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
              val prevLastReportTimestamp = lastReportTimestamp
              lastReportTimestamp = System.currentTimeMillis()
              logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
                new java.util.Date(prevLastReportTimestamp))
            }
          }
        }
      }
    
      /**
       * Return whether the event queue is empty.
       *
       * The use of synchronized here guarantees that all events that once belonged to this queue
       * have already been processed by all attached listeners, if this returns true.
       */
      private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
    
      /**
       * Stop the listener bus. It will wait until the queued events have been processed, but drop the
       * new events after stopping.
       */
      def stop() {
        if (!started.get()) {
          throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
        }
        // 这里也是用CAS来保证关闭时一次性的, 而且这里关闭时, 其它正在处理的进程会通过信号量失败掉
        if (stopped.compareAndSet(false, true)) {
          // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
          // `stop` is called.
          eventLock.release()
         //  会等待所有的event处理完了再结束, 不会立即结束.
          listenerThread.join()
        } else {
          // Keep quiet
        }
      }
    
      /**
       * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
       * notified with the dropped events.
       *
       * Note: `onDropEvent` can be called in any thread.
       */
      def onDropEvent(event: E): Unit
    }
    

    相关文章

      网友评论

          本文标题:3.2 LiveListenerBus - 监听器消息队列

          本文链接:https://www.haomeiwen.com/subject/gbqjfqtx.html