美文网首页我爱编程
事件总线--ListenerBus

事件总线--ListenerBus

作者: 炮灰向前冲啦 | 来源:发表于2018-03-20 18:38 被阅读0次

    Spark中很多组件都是靠RPC、事件消息机制实现通信的。前者解决远程通信问题,后者则是本地较为高效的通信方式

    定义ListenerBus

    Spark定义了一个trait的ListenerBus,可以接收事件并将事件提交给对应的事件监听器

    private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
    
      private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
    
      // Marked `private[spark]` for access in tests.
      private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
    
      /**
       * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
       * This method is intended to be overridden by subclasses.
       */
      protected def getTimer(listener: L): Option[Timer] = None
    
      /**
       * Add a listener to listen events. This method is thread-safe and can be called in any thread.
       */
      final def addListener(listener: L): Unit = {
        listenersPlusTimers.add((listener, getTimer(listener)))
      }
    
      /**
       * Remove a listener and it won't receive any events. This method is thread-safe and can be called
       * in any thread.
       */
      final def removeListener(listener: L): Unit = {
        listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
          listenersPlusTimers.remove(listenerAndTimer)
        }
      }
    
      /**
       * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
       * `postToAll` in the same thread for all events.
       */
      def postToAll(event: E): Unit = {
        // JavaConverters can create a JIterableWrapper if we use asScala.
        // However, this method will be called frequently. To avoid the wrapper cost, here we use
        // Java Iterator directly.
        val iter = listenersPlusTimers.iterator
        while (iter.hasNext) {
          val listenerAndMaybeTimer = iter.next()
          val listener = listenerAndMaybeTimer._1
          val maybeTimer = listenerAndMaybeTimer._2
          val maybeTimerContext = if (maybeTimer.isDefined) {
            maybeTimer.get.time()
          } else {
            null
          }
          try {
            doPostEvent(listener, event)
          } catch {
            case NonFatal(e) =>
              logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
          } finally {
            if (maybeTimerContext != null) {
              maybeTimerContext.stop()
            }
          }
        }
      }
    
      /**
       * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
       * thread for all listeners.
       */
      protected def doPostEvent(listener: L, event: E): Unit
    
      private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
        val c = implicitly[ClassTag[T]].runtimeClass
        listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
      }
    }
    

    ListenerBus是个泛型trait。其泛型参数为[L <: AnyRef, E]。L表示监听器引用,E表示事件。主要是将事件投递给对应的监听器处理

    • listenersPlusTimers:维护所有注册的监听器+Timer。其数据结构是线程安全的CopyOnWriteArrayList,存放的对象是二元组(L, Option[Timer])。L是监听器引用,Timer是Metrics计时器,用来统计RPC以及耗时
    • getTimer:获取监听器的时间统计Timer。protected类型,子类覆盖实现
    • addListener:向listenersPlusTimers中添加监听器。final且thread-safe
    • removeListener:从listenersPlusTimers中移除监听器
    • postToAll:将事件post给所有监听器。线程不安全;使用java iterator避免asScala的wrapper cost;Timer统计;catch所有NonFatal Throwable
    • doPostEvent:将事件post给指定的监听器。具体实现由子类override
    • findListenersByClass:查找与指定类型T相同的监听器列表。implicitly[ClassTag[T]].runtimeClass:获取泛型T的实际类型;asInstanceOf:对象强制转换为T类型

    ListenerBus继承体系

    spark-listenerbus.png
    • SparkListenerBus:trait类型。实现将SparkListenerEvent事件doPostEvent()给SparkListenerInterface监听器。SparkListenerInterface、SparkListenerEvent也都是trait类型
    • AsyncEventQueue:处理事件的start、stop、dispatch、post功能。事件保存在LinkedBlockingQueue
    • ReplayListenerBus:用于从序列化的事件流中重放事件
    • ExternalCatalog:abstract class。提供了操作DB、Table、Function的功能抽象并postToAll操作事件Pre、After
    • HiveExternalCatalog:操作Hive MetaStore的具体功能实现
    • InMemoryCatalog:HashMap保存并操作DB、Table信息
    • StreamingListenerBus:针对spark streaming各个阶段的事件监听
    • StreamingQueryListenerBus:Data Stream查询事件

    SparkListenerBus详解

    private[spark] trait SparkListenerBus
      extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
      protected override def doPostEvent(
          listener: SparkListenerInterface,
          event: SparkListenerEvent): Unit = {
        event match {
          case stageSubmitted: SparkListenerStageSubmitted =>
            listener.onStageSubmitted(stageSubmitted)
          case stageCompleted: SparkListenerStageCompleted =>
            listener.onStageCompleted(stageCompleted)
          ...
          case _ => listener.onOtherEvent(event)
        }
      }
    }
    

    SparkListenerBus继承ListenerBus,实现doPostEvent()方法。SparkListenerInterface是listener的trait抽象,SparkListenerEvent是event的trait抽象。SparkListener是abstract class类型,对SparkListenerInterface的缺省适配,提供方法的空实现

    SparkListenerBus实现了具体类型的event与listener的调用绑定

    AsyncEventQueue详解

    AsyncEventQueue继承SparkListenerBus、ListenerBus,拥有CopyOnWriteArrayList类型的监听器列表,以及LinkedBlockingQueue事件队列。监听器类型是SparkListenerInterface,事件类型是SparkListenerEvent

    private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
      extends SparkListenerBus
      with Logging {
    
      import AsyncEventQueue._
    
      // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
      // it's perpetually being added to more quickly than it's being drained.
      // 10000长度的阻塞队列,保存事件
      private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
        conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
    
      // Keep the event count separately, so that waitUntilEmpty() can be implemented properly;
      // this allows that method to return only when the events in the queue have been fully
      // processed (instead of just dequeued).
      // 未执行事件数。可以直接eventQueue.size()获取队列count
      private val eventCount = new AtomicLong()
    
      /** A counter for dropped events. It will be reset every time we log it. */
      // 删除事件数。日志打印时清零:droppedEventsCounter.compareAndSet(droppedCount, 0)
      private val droppedEventsCounter = new AtomicLong(0L)
    
      /** When `droppedEventsCounter` was logged last time in milliseconds. */
      @volatile private var lastReportTimestamp = 0L
    
      private val logDroppedEvent = new AtomicBoolean(false)
    
      private var sc: SparkContext = null
    
      private val started = new AtomicBoolean(false)
      private val stopped = new AtomicBoolean(false)
    
      private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
      private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
    
      // Remove the queue size gauge first, in case it was created by a previous incarnation of
      // this queue that was removed from the listener bus.
      metrics.metricRegistry.remove(s"queue.$name.size")
      metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
        override def getValue: Int = eventQueue.size()
      })
    
      // 新建一个daemon dispatchThread,用于事件分发
      private val dispatchThread = new Thread(s"spark-listener-group-$name") {
        setDaemon(true)
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          dispatch()
        }
      }
    
      // 分发事件。take事件,检查是否为POISON_PILL,然后将事件投递给监听器postToAll
      private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
        try {
          var next: SparkListenerEvent = eventQueue.take()
          while (next != POISON_PILL) {
            val ctx = processingTime.time()
            try {
              super.postToAll(next)
            } finally {
              ctx.stop()
            }
            eventCount.decrementAndGet()
            // 这里take是阻塞获取队列事件,保证next有值,while就会一直循环
            next = eventQueue.take()
          }
          eventCount.decrementAndGet()
        } catch {
          case ie: InterruptedException =>
            logInfo(s"Stopping listener queue $name.", ie)
        }
      }
    
      override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
    metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
      }
    
      /**
       * Start an asynchronous thread to dispatch events to the underlying listeners.
       *
       * @param sc Used to stop the SparkContext in case the async dispatcher fails.
       */
      // 启动异步线程
      private[scheduler] def start(sc: SparkContext): Unit = {
        if (started.compareAndSet(false, true)) {
          this.sc = sc
          dispatchThread.start()
        } else {
          throw new IllegalStateException(s"$name already started!")
        }
      }
    
      /**
       * Stop the listener bus. It will wait until the queued events have been processed, but new
       * events will be dropped.
       */
      // 停止事件分发。修改stopped状态,put POISON_PILL到eventQueue,等待线程执行结束dispatchThread.join()。优雅的实现停止:POISON_PILL
      private[scheduler] def stop(): Unit = {
        if (!started.get()) {
          throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
        }
        if (stopped.compareAndSet(false, true)) {
          eventCount.incrementAndGet()
          eventQueue.put(POISON_PILL)
        }
        dispatchThread.join()
      }
    
      // 接收事件
      def post(event: SparkListenerEvent): Unit = {
        // stop时直接return
        if (stopped.get()) {
          return
        }
    
        eventCount.incrementAndGet()
        // offer事件到eventQueue,成功则return,false则drop event
        if (eventQueue.offer(event)) {
          return
        }
    
        eventCount.decrementAndGet()
        droppedEvents.inc()
        // 当队列满时,drop event且droppedEventsCounter变量加1
        droppedEventsCounter.incrementAndGet()
        if (logDroppedEvent.compareAndSet(false, true)) {
          // Only log the following message once to avoid duplicated annoying logs.
          logError(s"Dropping event from queue $name. " +
            "This likely means one of the listeners is too slow and cannot keep up with " +
            "the rate at which tasks are being started by the scheduler.")
        }
        logTrace(s"Dropping event $event")
    
        val droppedCount = droppedEventsCounter.get
        if (droppedCount > 0) {
          // Don't log too frequently
          // 超过一分钟时log记录
          if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
            // There may be multiple threads trying to decrease droppedEventsCounter.
            // Use "compareAndSet" to make sure only one thread can win.
            // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
            // then that thread will update it.
            // 这里使用compareAndSet执行线程安全。也可以双重校验锁判断lastReportTimestamp,但没有CAS轻量!
            if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
              val prevLastReportTimestamp = lastReportTimestamp
              lastReportTimestamp = System.currentTimeMillis()
              val previous = new java.util.Date(prevLastReportTimestamp)
              logWarning(s"Dropped $droppedCount events from $name since $previous.")
            }
          }
        }
      }
    }
    
    // 一个“毒药丸”事件,标识队列事件执行到此结束
    private object AsyncEventQueue {
      val POISON_PILL = new SparkListenerEvent() { }
    }
    

    LiveListenerBus详解

    LiveListenerBus类是AsyncEventQueue管理器,建立了shared、appStatus、executorManagement、eventLog四种AsyncEventQueue事件调度器,保存在CopyOnWriteArrayList列表。创建多个AsyncEventQueue,提高事件响应时间

    post方法会将事件投递给所有Queue,而每个Queue的listener又是不同的,根据name区分。start时,CopyOnWriteArrayList列表中的所有AsyncEventQueue都会start,也就是迭代调用AsyncEventQueue.start(),多个线程并发事件调度

    private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
    
    /** Add a listener to queue shared by all non-internal listeners. */
      def addToSharedQueue(listener: SparkListenerInterface): Unit = {
        addToQueue(listener, SHARED_QUEUE)
      }
    
      /** Add a listener to the executor management queue. */
      def addToManagementQueue(listener: SparkListenerInterface): Unit = {
        addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
      }
    
      /** Add a listener to the application status queue. */
      def addToStatusQueue(listener: SparkListenerInterface): Unit = {
        addToQueue(listener, APP_STATUS_QUEUE)
      }
    
      /** Add a listener to the event log queue. */
      def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
        addToQueue(listener, EVENT_LOG_QUEUE)
      }
      
      private[spark] def addToQueue(
          listener: SparkListenerInterface,
          queue: String): Unit = synchronized {
        if (stopped.get()) {
          throw new IllegalStateException("LiveListenerBus is stopped.")
        }
    
        queues.asScala.find(_.name == queue) match {
          case Some(queue) =>
            queue.addListener(listener)
    
          case None =>
            val newQueue = new AsyncEventQueue(queue, conf, metrics)
            newQueue.addListener(listener)
            if (started.get()) {
              newQueue.start(sparkContext)
            }
            queues.add(newQueue)
        }
      }
    

    Hadoop的AsyncDispatcher详解

    Hadoop的AsyncDispatcher与Spark的AsyncEventQueue功能类似,部分实现细节不一样

    public interface Event<TYPE extends Enum<TYPE>> {
      TYPE getType();
      long getTimestamp();
      String toString();
    }
    
    public interface EventHandler<T extends Event> {
      void handle(T event);
    }
    
    public interface Dispatcher {
      // Configuration to make sure dispatcher crashes but doesn't do system-exit in
      // case of errors. By default, it should be false, so that tests are not
      // affected. For all daemons it should be explicitly set to true so that
      // daemons can crash instead of hanging around.
      public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
          "yarn.dispatcher.exit-on-error";
    
      public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
    
      EventHandler getEventHandler();
    
      void register(Class<? extends Enum> eventType, EventHandler handler);
    }
    

    EventHandler.handle(Event)这是完整的方法签名。抽象事件处理器接口EventHandler,具体化事件处理器方法handle(),抽象事件Event

    那怎么一个EventHandler处理多个Event?instanceof或type,就能一个handle方法处理多种Event。Hadoop引入Enum类型的type:TYPE extends Enum<TYPE>

    Dispatcher接口将EventType与EventHandler绑定。这里eventType是Class类型,而不是Enum。比如JobEventHandler直接绑定JobEventType class类型即可。若是绑定具体Enum,则要循环多次,比如JobEventType.JOB_INIT绑定JobEventHandler,JobEventType.JOB_START绑定JobEventHandler

    public abstract class AbstractEvent<TYPE extends Enum<TYPE>> 
        implements Event<TYPE> {
    
      private final TYPE type;
      private final long timestamp;
    
      // use this if you DON'T care about the timestamp
      public AbstractEvent(TYPE type) {
        this.type = type;
        // We're not generating a real timestamp here.  It's too expensive.
        timestamp = -1L;
      }
    
      // use this if you care about the timestamp
      public AbstractEvent(TYPE type, long timestamp) {
        this.type = type;
        this.timestamp = timestamp;
      }
    
      @Override
      public long getTimestamp() {
        return timestamp;
      }
    
      @Override
      public TYPE getType() {
        return type;
      }
    
      @Override
      public String toString() {
        return "EventType: " + getType();
      }
    }
    

    因为Event事件都会具体实例化,所以定义一个abstract class AbstractEvent,将type、timestamp共同的属性放入构造函数。若是将AbstractEvent定义为普通类class,则可以实例化,但AbstractEvent的实例化没有现实意义,所以进行abstract

    public class AsyncDispatcher extends AbstractService implements Dispatcher {
    
      private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
    
      private final BlockingQueue<Event> eventQueue;
      private volatile boolean stopped = false;
    
      // Configuration flag for enabling/disabling draining dispatcher's events on
      // stop functionality.
      private volatile boolean drainEventsOnStop = false;
    
      // Indicates all the remaining dispatcher's events on stop have been drained
      // and processed.
      private volatile boolean drained = true;
      private Object waitForDrained = new Object();
    
      // For drainEventsOnStop enabled only, block newly coming events into the
      // queue while stopping.
      private volatile boolean blockNewEvents = false;
      private EventHandler handlerInstance = null;
    
      private Thread eventHandlingThread;
      protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
      private boolean exitOnDispatchException;
    
      // 无长度限制的LinkedBlockingQueue
      public AsyncDispatcher() {
        this(new LinkedBlockingQueue<Event>());
      }
    
      public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
        super("Dispatcher");
        this.eventQueue = eventQueue;
        this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
      }
    
      Runnable createThread() {
        return new Runnable() {
          @Override
          public void run() {
            while (!stopped && !Thread.currentThread().isInterrupted()) {
              drained = eventQueue.isEmpty();
              // blockNewEvents is only set when dispatcher is draining to stop,
              // adding this check is to avoid the overhead of acquiring the lock
              // and calling notify every time in the normal run of the loop.
              if (blockNewEvents) {
                synchronized (waitForDrained) {
                  if (drained) {
                    waitForDrained.notify();
                  }
                }
              }
              Event event;
              try {
                event = eventQueue.take();
              } catch(InterruptedException ie) {
                if (!stopped) {
                  LOG.warn("AsyncDispatcher thread interrupted", ie);
                }
                return;
              }
              if (event != null) {
                dispatch(event);
              }
            }
          }
        };
      }
    
      @Override
      protected void serviceInit(Configuration conf) throws Exception {
        this.exitOnDispatchException =
            conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
              Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
        super.serviceInit(conf);
      }
    
      @Override
      protected void serviceStart() throws Exception {
        //start all the components
        super.serviceStart();
        eventHandlingThread = new Thread(createThread());
        eventHandlingThread.setName("AsyncDispatcher event handler");
        eventHandlingThread.start();
      }
    
      public void setDrainEventsOnStop() {
        drainEventsOnStop = true;
      }
    
      @Override
      protected void serviceStop() throws Exception {
        if (drainEventsOnStop) {
          blockNewEvents = true;
          LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
          synchronized (waitForDrained) {
            while (!drained && eventHandlingThread != null
                && eventHandlingThread.isAlive()) {
              waitForDrained.wait(1000);
              LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
                  eventHandlingThread.getState());
            }
          }
        }
        stopped = true;
        if (eventHandlingThread != null) {
          eventHandlingThread.interrupt();
          try {
            eventHandlingThread.join();
          } catch (InterruptedException ie) {
            LOG.warn("Interrupted Exception while stopping", ie);
          }
        }
    
        // stop all the components
        super.serviceStop();
      }
    
      @SuppressWarnings("unchecked")
      protected void dispatch(Event event) {
        //all events go thru this loop
        if (LOG.isDebugEnabled()) {
          LOG.debug("Dispatching the event " + event.getClass().getName() + "."
              + event.toString());
        }
    
        Class<? extends Enum> type = event.getType().getDeclaringClass();
    
        try{
          EventHandler handler = eventDispatchers.get(type);
          if(handler != null) {
            handler.handle(event);
          } else {
            throw new Exception("No handler for registered for " + type);
          }
        } catch (Throwable t) {
          //TODO Maybe log the state of the queue
          LOG.fatal("Error in dispatcher thread", t);
          // If serviceStop is called, we should exit this thread gracefully.
          if (exitOnDispatchException
              && (ShutdownHookManager.get().isShutdownInProgress()) == false
              && stopped == false) {
            Thread shutDownThread = new Thread(createShutDownThread());
            shutDownThread.setName("AsyncDispatcher ShutDown handler");
            shutDownThread.start();
          }
        }
      }
    
      @SuppressWarnings("unchecked")
      @Override
      public void register(Class<? extends Enum> eventType,
          EventHandler handler) {
        /* check to see if we have a listener registered */
        EventHandler<Event> registeredHandler = (EventHandler<Event>)
        eventDispatchers.get(eventType);
        LOG.info("Registering " + eventType + " for " + handler.getClass());
        if (registeredHandler == null) {
          eventDispatchers.put(eventType, handler);
        } else if (!(registeredHandler instanceof MultiListenerHandler)){
          /* for multiple listeners of an event add the multiple listener handler */
          MultiListenerHandler multiHandler = new MultiListenerHandler();
          multiHandler.addHandler(registeredHandler);
          multiHandler.addHandler(handler);
          eventDispatchers.put(eventType, multiHandler);
        } else {
          /* already a multilistener, just add to it */
          MultiListenerHandler multiHandler
          = (MultiListenerHandler) registeredHandler;
          multiHandler.addHandler(handler);
        }
      }
    
      @Override
      public EventHandler getEventHandler() {
        if (handlerInstance == null) {
          handlerInstance = new GenericEventHandler();
        }
        return handlerInstance;
      }
    
      class GenericEventHandler implements EventHandler<Event> {
        public void handle(Event event) {
          if (blockNewEvents) {
            return;
          }
          drained = false;
    
          /* all this method does is enqueue all the events onto the queue */
          int qSize = eventQueue.size();
          if (qSize !=0 && qSize %1000 == 0) {
            LOG.info("Size of event-queue is " + qSize);
          }
          int remCapacity = eventQueue.remainingCapacity();
          if (remCapacity < 1000) {
            LOG.warn("Very low remaining capacity in the event-queue: "
                + remCapacity);
          }
          try {
            eventQueue.put(event);
          } catch (InterruptedException e) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", e);
            }
            // Need to reset drained flag to true if event queue is empty,
            // otherwise dispatcher will hang on stop.
            drained = eventQueue.isEmpty();
            throw new YarnRuntimeException(e);
          }
        };
      }
    
      /**
       * Multiplexing an event. Sending it to different handlers that
       * are interested in the event.
       * @param <T> the type of event these multiple handlers are interested in.
       */
       // 一个Event对应多个EventHandler
      static class MultiListenerHandler implements EventHandler<Event> {
        List<EventHandler<Event>> listofHandlers;
    
        public MultiListenerHandler() {
          listofHandlers = new ArrayList<EventHandler<Event>>();
        }
    
        @Override
        public void handle(Event event) {
          for (EventHandler<Event> handler: listofHandlers) {
            handler.handle(event);
          }
        }
    
        void addHandler(EventHandler<Event> handler) {
          listofHandlers.add(handler);
        }
      }
    
      Runnable createShutDownThread() {
        return new Runnable() {
          @Override
          public void run() {
            LOG.info("Exiting, bbye..");
            System.exit(-1);
          }
        };
      }
    
      @VisibleForTesting
      protected boolean isEventThreadWaiting() {
        return eventHandlingThread.getState() == Thread.State.WAITING;
      }
    
      @VisibleForTesting
      protected boolean isDrained() {
        return this.drained;
      }
    }
    

    AsyncDispatcher与AsyncEventQueue

    异同点

    相同点

    • 维护LinkedBlockingQueue保存Event
    • 有start、stop方法启停
    • 另起一个线程循环消费Queue并dispatch event
    • 提供生产事件方法。AsyncDispatcher是内部类GenericEventHandler.handle;AsyncEventQueue是post方法

    不同点

    • AsyncDispatcher提供register方法将Event和EventHandler绑定,通过EventType来进行handle时调用不同方法;AsyncEventQueue提供doPostEvent方法实现不同Event调用Listener接口的不同方法(SparkListenerBus.doPostEvent)

    AsyncDispatcher消费事件dispatch时,可以根据Event类型,HashMap中获取register的EventHandler,然后执行handle方法。AsyncEventQueue消费事件dispatch时,只能遍历Listener,调用doPostEvent方法,模式匹配后执行对应的事件监听器

    Hadoop的事件是Event,处理器是EventHandler,方法是handle()。而Spark进行了更进一步的抽象,ListenerBus[L, E]。SparkListenerInterface、StreamingListener、ExternalCatalogEventListener是L的泛型trait;SparkListenerEvent、StreamingListenerEvent、ExternalCatalogEvent是E的泛型trait

    使用场景

    AsyncDispatcher
    ResourceManager、NodeManager、MRAppMaster等类在serviceInit()方法中调用createDispatcher(),创建AsyncDispatcher对象。每个类负责处理一系列Enum类事件,而不只是一种Enum类

    rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
    rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
    rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
    

    AsyncDispatcher不作全局的事件调度器,而是每个重要的类都有一个自身的AsyncDispatcher调度器,提高事件响应时间

    AsyncEventQueue
    LiveListenerBus是对AsyncEventQueue的统一管理器,相比Hadoop的各自为政,架构上更优秀。SparkContext在初始化时创建LiveListenerBus,并传递给JobScheduler、DAGScheduler等引用,所以LiveListenerBus必须是线程安全的

    Guava的EventBus

    EventBus发布订阅源码分析

    EventBus源码解读

    Android EventBus源码解析

    相关文章

      网友评论

        本文标题:事件总线--ListenerBus

        本文链接:https://www.haomeiwen.com/subject/xgnbqftx.html