美文网首页Spark源码精读分析计划spark||flink||scalaSpark
Spark Core源码精读计划#6:AsyncEventQue

Spark Core源码精读计划#6:AsyncEventQue

作者: LittleMagic | 来源:发表于2019-03-31 22:53 被阅读63次

    目录

    前言

    在上一篇文章中,我们了解了Spark事件总线机制的概况,以及ListenerBus、SparkListenerBus的细节。

    由之前的分析可知,SparkListenerBus默认提供的事件投递方法是同步调用的。如果注册的监听器和产生的事件非常多,同步调用必然会造成事件的积压以及处理的延时。因此,在SparkListenerBus的实现类AsyncEventQueue中,提供了异步事件队列机制,它也是SparkContext中的事件总线LiveListenerBus的基础。本文就来研究它们。

    异步事件队列AsyncEventQueue

    老样子,首先来看类的声明及其内部的属性定义。

    代码#6.1 - o.a.s.scheduler.AsyncEventQueue类声明及其属性

    private class AsyncEventQueue(
        val name: String,
        conf: SparkConf,
        metrics: LiveListenerBusMetrics,
        bus: LiveListenerBus)
      extends SparkListenerBus
      with Logging {
      import AsyncEventQueue._
    
      private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
        conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
    
      private val eventCount = new AtomicLong()
    
      private val droppedEventsCounter = new AtomicLong(0L)
    
      @volatile private var lastReportTimestamp = 0L
    
      private val logDroppedEvent = new AtomicBoolean(false)
    
      private var sc: SparkContext = null
    
      private val started = new AtomicBoolean(false)
      private val stopped = new AtomicBoolean(false)
    
      private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
      private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
    
      private val dispatchThread = new Thread(s"spark-listener-group-$name") {
        setDaemon(true)
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          dispatch()
        }
      }
    
      // ...
    }
    

    该类的构造参数有四个,分别是队列名、Spark配置项、LiveListenerBus的监控度量,以及LiveListenerBus本身。下面来看一下它的主要属性。

    eventQueue、eventCount属性

    eventQueue是一个存储SparkListenerEvent事件的阻塞队列LinkedBlockingQueue。它的大小是通过配置参数spark.scheduler.listenerbus.eventqueue.capacity来设置的,默认值10000。如果不设置阻塞队列的大小,那么默认值会是Integer.MAX_VALUE,有OOM的风险。

    eventCount则是当前待处理事件的计数。因为事件从队列中弹出不代表已经处理完成,所以不能直接用队列的实际大小来表示。它是AtomicLong类型的,以保证修改的原子性。

    droppedEventsCounter、lastReportTimestamp、logDroppedEvent属性

    droppedEventsCounter是被丢弃事件的计数。当阻塞队列已满后,新产生的事件无法入队,就会被丢弃。日志中定期输出该计数器的值,用lastReportTimestamp记录下每次输出的时间戳,并且输出后都会将计数器重新置为0。

    logDroppedEvent用于指示是否发生过了事件丢弃的情况。它与droppedEventsCounter一样也都是原子类型的。

    started、stopped属性

    这两个属性分别用来标记队列的启动与停止状态。

    dispatchThread属性

    dispatchThread是将队列中的事件分发到各监听器的守护线程,实际上调用了dispatch()方法。而Utils.tryOrStopSparkContext()方法的作用在于执行代码块时如果抛出异常,就另外起一个线程关闭SparkContext。

    下面就来看看dispatch()方法的源码。

    dispatch()方法

    代码#6.2 - o.a.s.scheduler.AsyncEventQueue.dispatch()方法

      private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
        var next: SparkListenerEvent = eventQueue.take()
        while (next != POISON_PILL) {
          val ctx = processingTime.time()
          try {
            super.postToAll(next)
          } finally {
            ctx.stop()
          }
          eventCount.decrementAndGet()
          next = eventQueue.take()
        }
        eventCount.decrementAndGet()
      }
    

    可见,该方法循环地从事件队列中取出事件,并调用父类ListenerBus特征的postToAll()方法(文章#5已经讲过)将其投递给所有已注册的监听器,并减少计数器的值。“毒药丸”POISON_PILL是伴生对象中定义的一个特殊的空事件,在队列停止(即调用stop()方法)时会被放入,dispatcherThread取得它之后就会“中毒”退出循环。

    有了处理事件的方法,还得有将事件放入队列的方法才完整。下面是入队的方法post()。

    post()方法

    代码#6.3 - o.a.s.scheduler.AsyncEventQueue.post()方法

      def post(event: SparkListenerEvent): Unit = {
        if (stopped.get()) {
          return
        }
    
        eventCount.incrementAndGet()
        if (eventQueue.offer(event)) {
          return
        }
    
        eventCount.decrementAndGet()
        droppedEvents.inc()
        droppedEventsCounter.incrementAndGet()
        if (logDroppedEvent.compareAndSet(false, true)) {
          logError(s"Dropping event from queue $name. " +
            "This likely means one of the listeners is too slow and cannot keep up with " +
            "the rate at which tasks are being started by the scheduler.")
        }
        logTrace(s"Dropping event $event")
    
        val droppedCount = droppedEventsCounter.get
        if (droppedCount > 0) {
          if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
            if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
              val prevLastReportTimestamp = lastReportTimestamp
              lastReportTimestamp = System.currentTimeMillis()
              val previous = new java.util.Date(prevLastReportTimestamp)
              logWarning(s"Dropped $droppedCount events from $name since $previous.")
            }
          }
        }
      }
    

    该方法首先检查队列是否已经停止。如果是运行状态,就试图将事件event入队。若offer()方法返回false,表示队列已满,将丢弃事件的计数器自增,并标记有事件被丢弃。最后,若当前的时间戳与上一次输出droppedEventsCounter值的间隔大于1分钟,就在日志里输出它的值。

    理解了AsyncEventQueue的细节之后,我们就可以进一步来看LiveListenerBus的实现了。

    异步事件总线LiveListenerBus

    AsyncEventQueue已经继承了SparkListenerBus特征,LiveListenerBus内部用到了AsyncEventQueue作为核心。来看它的声明以及属性的定义。

    代码#6.4 - o.a.s.scheduler.LiveListenerBus类声明及其属性

    private[spark] class LiveListenerBus(conf: SparkConf) {
      import LiveListenerBus._
    
      private var sparkContext: SparkContext = _
    
      private[spark] val metrics = new LiveListenerBusMetrics(conf)
    
      private val started = new AtomicBoolean(false)
      private val stopped = new AtomicBoolean(false)
    
      private val droppedEventsCounter = new AtomicLong(0L)
    
      @volatile private var lastReportTimestamp = 0L
    
      private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
    
      @volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
    
      // ...
    }
    

    这里的属性与AsyncEventQueue大同小异,多出来的主要是queues与queuedEvents两个。

    queues属性

    queues维护一个AsyncEventQueue的列表,也就是说LiveListenerBus中会有多个事件队列。它采用CopyOnWriteArrayList来保证线程安全性。

    queuedEvents属性

    queuedEvents维护一个SparkListenerEvent的列表,它的用途是在LiveListenerBus启动成功之前,缓存可能已经收到的事件。在启动之后,这些缓存的事件会首先投递出去。

    LiveListenerBus作为一个事件总线,也必须提供监听器注册、事件投递等功能,这些都是在AsyncEventQueue基础之上实现的,下面来看一看。

    addToQueue()方法

    代码#6.5 - o.a.s.scheduler.LiveListenerBus.addToQueue()方法

      private[spark] def addToQueue(
          listener: SparkListenerInterface,
          queue: String): Unit = synchronized {
        if (stopped.get()) {
          throw new IllegalStateException("LiveListenerBus is stopped.")
        }
    
        queues.asScala.find(_.name == queue) match {
          case Some(queue) =>
            queue.addListener(listener)
    
          case None =>
            val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
            newQueue.addListener(listener)
            if (started.get()) {
              newQueue.start(sparkContext)
            }
            queues.add(newQueue)
        }
      }
    

    该方法将监听器listener注册到名为queue的队列中。它会在queues列表中寻找符合条件的队列,如果该队列已经存在,就调用父类ListenerBus的addListener()方法直接注册监听器。反之,就先创建一个AsyncEventQueue,注册监听器到新的队列中。

    LiveListenerBus还提供了另外4种直接注册监听器的方法,分别对应内置的4个队列,其名称在伴生对象中有定义。

    代码#6.6 - LiveListenerBus中其他注册监听器的方法

      def addToSharedQueue(listener: SparkListenerInterface): Unit = {
        addToQueue(listener, SHARED_QUEUE)
      }
    
      def addToManagementQueue(listener: SparkListenerInterface): Unit = {
        addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
      }
    
      def addToStatusQueue(listener: SparkListenerInterface): Unit = {
        addToQueue(listener, APP_STATUS_QUEUE)
      }
    
      def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
        addToQueue(listener, EVENT_LOG_QUEUE)
      }
    

    post()、postToQueues()方法

    代码#6.7 - o.a.s.scheduler.LiveListenerBus.post()与postToQueues()方法

      def post(event: SparkListenerEvent): Unit = {
        if (stopped.get()) {
          return
        }
        metrics.numEventsPosted.inc()
    
        if (queuedEvents == null) {
          postToQueues(event)
          return
        }
    
        synchronized {
          if (!started.get()) {
            queuedEvents += event
            return
          }
        }
    
        postToQueues(event)
      }
    
      private def postToQueues(event: SparkListenerEvent): Unit = {
        val it = queues.iterator()
        while (it.hasNext()) {
          it.next().post(event)
        }
      }
    

    post()方法会检查queuedEvents中有无缓存的事件,以及事件总线是否还没有启动。投递时会调用postToQueues()方法,将事件发送给所有队列,由AsyncEventQueue来完成投递到监听器的工作。

    总结

    本文研究了与SparkContext相关的异步事件处理机制的实现,即AsyncEventQueue与LiveListenerBus。它们之间的关系可以用下面的简图来表示。


    图#6.1 - AsyncEventQueue与LiveListenerBus示意

    按照SparkContext初始化的顺序,下面将要介绍的是SparkEnv,内容也相当多,我们采用与研究SparkContext时类似的方式来阅读它的源码。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#6:AsyncEventQue

        本文链接:https://www.haomeiwen.com/subject/hvypbqtx.html