美文网首页
Spark消息总线实现

Spark消息总线实现

作者: Sunnywade | 来源:发表于2018-01-08 23:41 被阅读0次

    消息总线概述

    消息总线是Spark内部进行消息传递,触发事件的框架,消息总线的核心是由三个抽象的对象组成:

    • Event: 定义了一个事件;
    • Listener:定义了一个监听器,是用于对Event作出响应的实体;
    • Bus: 将Event路由到某个Listener的管道,负责接收Event、注册Listener,可以说Bus是Listener与Event的连接器。

    ListenerBus

    在Spark中,消息总线最顶层的抽象是:ListenerBus,其源码实现如下:

    package org.apache.spark.util
    
    import java.util.concurrent.CopyOnWriteArrayList
    
    import scala.collection.JavaConverters._
    import scala.reflect.ClassTag
    import scala.util.control.NonFatal
    
    import com.codahale.metrics.Timer
    
    import org.apache.spark.internal.Logging
    
    /**
     * An event bus which posts events to its listeners.
     */
    private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
    
      private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
    
      // Marked `private[spark]` for access in tests.
      private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
    
      /**
       * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
       * This method is intended to be overridden by subclasses.
       */
      protected def getTimer(listener: L): Option[Timer] = None
    
      /**
       * Add a listener to listen events. This method is thread-safe and can be called in any thread.
       */
      final def addListener(listener: L): Unit = {
        listenersPlusTimers.add((listener, getTimer(listener)))
      }
    
      /**
       * Remove a listener and it won't receive any events. This method is thread-safe and can be called
       * in any thread.
       */
      final def removeListener(listener: L): Unit = {
        listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
          listenersPlusTimers.remove(listenerAndTimer)
        }
      }
    
      /**
       * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
       * `postToAll` in the same thread for all events.
       */
      def postToAll(event: E): Unit = {
        // JavaConverters can create a JIterableWrapper if we use asScala.
        // However, this method will be called frequently. To avoid the wrapper cost, here we use
        // Java Iterator directly.
        val iter = listenersPlusTimers.iterator
        while (iter.hasNext) {
          val listenerAndMaybeTimer = iter.next()
          val listener = listenerAndMaybeTimer._1
          val maybeTimer = listenerAndMaybeTimer._2
          val maybeTimerContext = if (maybeTimer.isDefined) {
            maybeTimer.get.time()
          } else {
            null
          }
          try {
            doPostEvent(listener, event)
          } catch {
            case NonFatal(e) =>
              logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
          } finally {
            if (maybeTimerContext != null) {
              maybeTimerContext.stop()
            }
          }
        }
      }
    
      /**
       * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
       * thread for all listeners.
       */
      protected def doPostEvent(listener: L, event: E): Unit
    
      private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
        val c = implicitly[ClassTag[T]].runtimeClass
        listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
      }
    
    }
    
    

    其中listenersPlusTimers是一个线程安全的List对象,用于存储注册在该总线上的Listener,此外还定义了注册Listener的方法-addListener和注销Listener的方法-removeListener。

    在这个抽象类中最重要的方法是postToAll,该方法负责将传入的Event广播给listenersPlusTimers中注册的每一个Listener,通过调用doPostEvent方法将Event和Listener进行关联。doPostEvent在这个特质中没有给出具体的实现,可以通过继承ListenerBus来实现自定义的消息总线。

    在Spark中ListenerBus有很多中实现,直接实现包括SparkListenerBus,ExternalCatalog等。每一种实现又根据特定的场景有细分,SparkListenerBus是Spark中最重要的消息总线实现,下面重点描述。

    SparkListenerBus

    SparkListenerBus的实现源码如下:

    package org.apache.spark.scheduler
    
    import org.apache.spark.util.ListenerBus
    
    /**
     * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
     */
    private[spark] trait SparkListenerBus
      extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
    
      protected override def doPostEvent(
          listener: SparkListenerInterface,
          event: SparkListenerEvent): Unit = {
        event match {
          case stageSubmitted: SparkListenerStageSubmitted =>
            listener.onStageSubmitted(stageSubmitted)
          case stageCompleted: SparkListenerStageCompleted =>
            listener.onStageCompleted(stageCompleted)
          case jobStart: SparkListenerJobStart =>
            listener.onJobStart(jobStart)
          case jobEnd: SparkListenerJobEnd =>
            listener.onJobEnd(jobEnd)
          case taskStart: SparkListenerTaskStart =>
            listener.onTaskStart(taskStart)
          case taskGettingResult: SparkListenerTaskGettingResult =>
            listener.onTaskGettingResult(taskGettingResult)
          case taskEnd: SparkListenerTaskEnd =>
            listener.onTaskEnd(taskEnd)
          case environmentUpdate: SparkListenerEnvironmentUpdate =>
            listener.onEnvironmentUpdate(environmentUpdate)
          case blockManagerAdded: SparkListenerBlockManagerAdded =>
            listener.onBlockManagerAdded(blockManagerAdded)
          case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
            listener.onBlockManagerRemoved(blockManagerRemoved)
          case unpersistRDD: SparkListenerUnpersistRDD =>
            listener.onUnpersistRDD(unpersistRDD)
          case applicationStart: SparkListenerApplicationStart =>
            listener.onApplicationStart(applicationStart)
          case applicationEnd: SparkListenerApplicationEnd =>
            listener.onApplicationEnd(applicationEnd)
          case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
            listener.onExecutorMetricsUpdate(metricsUpdate)
          case executorAdded: SparkListenerExecutorAdded =>
            listener.onExecutorAdded(executorAdded)
          case executorRemoved: SparkListenerExecutorRemoved =>
            listener.onExecutorRemoved(executorRemoved)
          case executorBlacklisted: SparkListenerExecutorBlacklisted =>
            listener.onExecutorBlacklisted(executorBlacklisted)
          case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
            listener.onExecutorUnblacklisted(executorUnblacklisted)
          case nodeBlacklisted: SparkListenerNodeBlacklisted =>
            listener.onNodeBlacklisted(nodeBlacklisted)
          case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
            listener.onNodeUnblacklisted(nodeUnblacklisted)
          case blockUpdated: SparkListenerBlockUpdated =>
            listener.onBlockUpdated(blockUpdated)
          case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
            listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
          case _ => listener.onOtherEvent(event)
        }
      }
    
    }
    

    由代码可以看出,SparkListenerBus重写了ListenerBus的doPostEvent方法,定义了自己的Listener类型:SparkListenerInterface和Event类型:SparkListenerEvent,在doPostEvent方法中通过类型匹配,只处理特定的事件,匹配到特定的事件后,将事件交给SparkListenerInterface的对应方法去处理。
    SparkListenerInterface是一个特质,定义了所有SparkListenerEvent的处理方法,最常见的实现SparkListenerInterface的类是驻守在Spark Driver进程中的HeartbeatReceiver类,这个类用于维护Driver与Executor之间的心跳,后续会对这个类展开详细解读。

    异步通信的消息总线AsyncEventQueue

    SparkListenerBus的doPostEvent方法将消息路由到了Listener的具体方法之中,并未定义Listener是以什么形似对Event作出响应。
    AsyncEventQueue通过继承SparkListenerBus,定义了一种异步响应Event的消息框架,其基本原理如下:
    1 . 定义一个FIFO且线程安全的Queue-eventQueue,eventQueue用于存放等待触发执行的Event。

      // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
      // it's perpetually being added to more quickly than it's being drained.
      private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
        conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
    
    1. 定义一个线程dispatchThread,用于循环从eventQueue中取出Event,并将Event dispatch到Listener。
      private val dispatchThread = new Thread(s"spark-listener-group-$name") {
        setDaemon(true)
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          dispatch()
        }
      }
    

    其中tryOrStopSparkContext方法保证了在dispatch遇到无法控制的异常时将SparkContext自动退出,避免内存泄漏。

    dispatch()函数的实现逻辑如下:

      private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
        try {
          var next: SparkListenerEvent = eventQueue.take()
          while (next != POISON_PILL) {
            val ctx = processingTime.time()
            try {
              super.postToAll(next)
            } finally {
              ctx.stop()
            }
            eventCount.decrementAndGet()
            next = eventQueue.take()
          }
          eventCount.decrementAndGet()
        } catch {
          case ie: InterruptedException =>
            logInfo(s"Stopping listener queue $name.", ie)
        }
      }
    

    该方法定义了一个循环体不停的从eventQueue中取出event,并派发到总线中注册的Listener中。该总线定义了一个“哨兵”Event-POISON_PILL,用于标记stop, 当调用stop时,这个Event-POISON_PILL会被添加到eventQueue的尾部,当取出Event-POISON_PILL时,退出循环,从而退出线程dispatch线程。当线程退出时,eventCount变为0.

    1. 定义post方法,将Event添加到eventQueue中
     def post(event: SparkListenerEvent): Unit = {
        if (stopped.get()) {
          return
        }
    
        eventCount.incrementAndGet()
        if (eventQueue.offer(event)) {
          return
        }
    
        eventCount.decrementAndGet()
        droppedEvents.inc()
        droppedEventsCounter.incrementAndGet()
        if (logDroppedEvent.compareAndSet(false, true)) {
          // Only log the following message once to avoid duplicated annoying logs.
          logError(s"Dropping event from queue $name. " +
            "This likely means one of the listeners is too slow and cannot keep up with " +
            "the rate at which tasks are being started by the scheduler.")
        }
        logTrace(s"Dropping event $event")
    
        val droppedCount = droppedEventsCounter.get
        if (droppedCount > 0) {
          // Don't log too frequently
          if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
            // There may be multiple threads trying to decrease droppedEventsCounter.
            // Use "compareAndSet" to make sure only one thread can win.
            // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
            // then that thread will update it.
            if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
              val prevLastReportTimestamp = lastReportTimestamp
              lastReportTimestamp = System.currentTimeMillis()
              val previous = new java.util.Date(prevLastReportTimestamp)
              logWarning(s"Dropped $droppedEvents events from $name since $previous.")
            }
          }
        }
      }
    

    整个实现是一个典型的“生产者-消费者”模式,post方法是Event的生产者,负责向eventQueue中添加Event,dispatchThread线程是消费者,负责从eventQueue取出Event并派发到Listener。该方法中eventCount 是一个AtomicLong对象,用于单独记录Event的个数,单独记录Event,而不使用eventQueue的原因是能够保证当eventCount为0时,所有的Event都被处理了,而不仅仅是从eventQueue中取出了,这个可以在类的dispatch方法中看出。

    当AsyncEventQueue的stop被调用后,post不再接收新的Event。

    LiveListenerBus 这个AsyncEventQueue的容器

    LiveListenerBus从面向对象的角度来讲,它不是一个bus,它定义了一个CopyOnWriteArrayList对象queues来存储AsyncEventQueue,LiveListenerBus负责添加、移除queues中的AsyncEventQueue,Spark在queues中预定义了三个AsyncEventQueue:

    • SHARED_QUEUE
    • APP_STATUS_QUEUE
    • EXECUTOR_MANAGEMENT_QUEUE
    • EVENT_LOG_QUEUE
      分别处理特定类型的Spark Event。

    LiveListenerBus是SparkContext的重要组成部分。

    相关文章

      网友评论

          本文标题:Spark消息总线实现

          本文链接:https://www.haomeiwen.com/subject/mgapnxtx.html