美文网首页
【Spark】Spark 事件总线

【Spark】Spark 事件总线

作者: w1992wishes | 来源:发表于2018-10-28 14:31 被阅读56次

    本篇结构:

    • 事件总线介绍
    • ListenerBus 继承体系
    • LiveListenerBus 详解
    • 流程总结

    一、事件总线介绍

    Spark 定义了一个特质 ListenerBus,可以接受事件并且将事件提交到对应事件的监听器。

    该特征主要有一个 listeners 成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]。

    该特征还有几个主要的函数:

    • addListener:添加 listener
    • doPostEvent:给特定 listener 发送事件,该方法具体需要子类实现
    • findListenersByClass:根据类型查找 listener 列表
    • postToAll: 把事件发送给所有的 listener,虽然 CopyOnWriteArrayList 是线程安全的,但 postAll 引入了“先检查后运行”的逻辑,因此该方法不是线程安全的。
    • removeListener:删除 listener
    • removeListenerOnError:内部调用 removeListener,可由子类覆盖

    二、ListenerBus 继承体系

    上图是 spark 2.1.0 版本事件总线的继承关系,版本不同,会略有不同。

    每个 ListenerBus 用于将不同的 Event 投递到不同的 Listener 中,下面以主要分析下 LiveListenerBus。

    三、LiveListenerBus 详解

    LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。

    3.1、LiveListenerBus 中的属性:

    // Cap the capacity of the event queue so we get an explicit error (rather than
    // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
    private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
    private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
    
    private def validateAndGetQueueSize(): Int = {
        val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
        if (queueSize <= 0) {
          throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
        }
        queueSize
      }
    
    private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
    ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
      .intConf
      .createWithDefault(10000)
    
    • eventQueue:是 SparkListenerEvent 事件的阻塞队列,队列大小可以通过 Spark 属性 spark.scheduler.listenerbus.eventqueue.size 进行配置,默认为 10000;
    // Indicate if `start()` is called
    private val started = new AtomicBoolean(false)
    // Indicate if `stop()` is called
    private val stopped = new AtomicBoolean(false)
    
    • started:标记 LiveListenerBus 的启动状态的 AtomicBoolean 类型的变量;
    • stopped:标记LiveListenerBus的停止状态的 AtomicBoolean 类型的变量;
    /** A counter for dropped events. It will be reset every time we log it. */
    private val droppedEventsCounter = new AtomicLong(0L)
    /** When `droppedEventsCounter` was logged last time in milliseconds. */
    @volatile private var lastReportTimestamp = 0L
    
    • droppedEventsCounter:使用 AtomicLong 类型对删除的事件进行计数,每当日志打印了 droppedEventsCounter 后,会将 droppedEventsCounter 重置为0;
    • lastReportTimestamp:记录最后一次日志打印 droppedEventsCounter 的时间戳;
    // Indicate if we are processing some event
    // Guarded by `self`
    private var processingEvent = false
    
    • processingEvent:暗示当前正有事件在被 listenerThread 线程处理;
    private val logDroppedEvent = new AtomicBoolean(false)
    
    • logDroppedEvent:标记是否由于 eventQueue 已满,导致新的事件被删除;
    // A counter that represents the number of events produced and consumed in the queue
    private val eventLock = new Semaphore(0)
    
    • eventLock:表示队列中事件产生和消费的一个计数器,当有新的事件到来时释放信号量,当对事件进行处理时获取信号量,eventLock = new Semaphore(0);

    • listenerThread:异步处理事件的线程;

    3.2、异步事件处理线程

      private val listenerThread = new Thread(name) {
        setDaemon(true)
        override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
          LiveListenerBus.withinListenerThread.withValue(true) {
            while (true) {
              eventLock.acquire()
              self.synchronized {
                processingEvent = true
              }
              try {
                val event = eventQueue.poll
                if (event == null) {
                  // Get out of the while loop and shutdown the daemon thread
                  if (!stopped.get) {
                    throw new IllegalStateException("Polling `null` from eventQueue means" +
                      " the listener bus has been stopped. So `stopped` must be true")
                  }
                  return
                }
                postToAll(event)
              } finally {
                self.synchronized {
                  processingEvent = false
                }
              }
            }
          }
        }
      }
    

    代码不算复杂,主要逻辑是:

    • 设置为 daemon thread;
    • 不断获取信号量,如果没有就会阻塞,有信号释放才会往下运行(这是依靠 new Semaphore(0)实现的,在 spark 后面的版本中,是直接用阻塞队列的 take() 方法实现。);
    • 同步控制,将 processingEvent 设置为 true;
    • 从 eventQueue 中获取事件;
    • 调用超类 ListenerBus 的 postToAll 方法,对监听器进行遍历,并调用 SparkListenerBus 的 doPostEvent 方法对事件进行匹配后执行监听器的相应方法;;
    • 每次循环结束同步控制,将 processingEvent 设置为 false;

    3.3、异步事件处理线程的事件来源

    DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。

      def post(event: SparkListenerEvent): Unit = {
        if (stopped.get) {
          // Drop further events to make `listenerThread` exit ASAP
          logError(s"$name has already stopped! Dropping event $event")
          return
        }
        val eventAdded = eventQueue.offer(event)
        if (eventAdded) {
          eventLock.release()
        } else {
          onDropEvent(event)
          droppedEventsCounter.incrementAndGet()
        }
    
        val droppedEvents = droppedEventsCounter.get
        if (droppedEvents > 0) {
          // Don't log too frequently
          if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
            // There may be multiple threads trying to decrease droppedEventsCounter.
            // Use "compareAndSet" to make sure only one thread can win.
            // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
            // then that thread will update it.
            if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
              val prevLastReportTimestamp = lastReportTimestamp
              lastReportTimestamp = System.currentTimeMillis()
              logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
                new java.util.Date(prevLastReportTimestamp))
            }
          }
        }
      }
    
    • 先判断 LiveListenerBus 是否停止,停止记录错误日志,返回;
    • 向eventQueue中添加事件:
      • 如果成功,就释放信号量,这时 listenerThread 中的 eventLock.acquire() 就可以后去信号量,从队列取出事件进行后续操作;
      • 如果失败,则移除事件 onDropEvent,并对删除事件计数器进行自增 droppedEventsCounter.incrementAndGet();
    • 如果有事件被删除,并且当前系统时间距离上一次打印 droppedEventsCounter 超过了 60 秒则重置 droppedEventsCounter 计算为0,并更新 lastReportTimestamp 为当前系统时间。

    四、流程总结

    用一张图总结下的 Spark 的事件总线大致的流程:

    五、参考资料

    这篇文章内容和 《spark内核设计的艺术架构设计与实现》 关于事件总线的描述章节相差不多,流程图也一样。之所以还要花费时间记录,是因为这样才更有感觉,正所谓“好记性,不如烂笔头”。

    相关文章

      网友评论

          本文标题:【Spark】Spark 事件总线

          本文链接:https://www.haomeiwen.com/subject/zowptqtx.html