美文网首页我爱编程
事件总线--ListenerBus

事件总线--ListenerBus

作者: 炮灰向前冲啦 | 来源:发表于2018-03-20 18:38 被阅读0次

Spark中很多组件都是靠RPC、事件消息机制实现通信的。前者解决远程通信问题,后者则是本地较为高效的通信方式

定义ListenerBus

Spark定义了一个trait的ListenerBus,可以接收事件并将事件提交给对应的事件监听器

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]

  // Marked `private[spark]` for access in tests.
  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava

  /**
   * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
   * This method is intended to be overridden by subclasses.
   */
  protected def getTimer(listener: L): Option[Timer] = None

  /**
   * Add a listener to listen events. This method is thread-safe and can be called in any thread.
   */
  final def addListener(listener: L): Unit = {
    listenersPlusTimers.add((listener, getTimer(listener)))
  }

  /**
   * Remove a listener and it won't receive any events. This method is thread-safe and can be called
   * in any thread.
   */
  final def removeListener(listener: L): Unit = {
    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
      listenersPlusTimers.remove(listenerAndTimer)
    }
  }

  /**
   * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
   * `postToAll` in the same thread for all events.
   */
  def postToAll(event: E): Unit = {
    // JavaConverters can create a JIterableWrapper if we use asScala.
    // However, this method will be called frequently. To avoid the wrapper cost, here we use
    // Java Iterator directly.
    val iter = listenersPlusTimers.iterator
    while (iter.hasNext) {
      val listenerAndMaybeTimer = iter.next()
      val listener = listenerAndMaybeTimer._1
      val maybeTimer = listenerAndMaybeTimer._2
      val maybeTimerContext = if (maybeTimer.isDefined) {
        maybeTimer.get.time()
      } else {
        null
      }
      try {
        doPostEvent(listener, event)
      } catch {
        case NonFatal(e) =>
          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
      } finally {
        if (maybeTimerContext != null) {
          maybeTimerContext.stop()
        }
      }
    }
  }

  /**
   * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
   * thread for all listeners.
   */
  protected def doPostEvent(listener: L, event: E): Unit

  private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
    val c = implicitly[ClassTag[T]].runtimeClass
    listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
  }
}

ListenerBus是个泛型trait。其泛型参数为[L <: AnyRef, E]。L表示监听器引用,E表示事件。主要是将事件投递给对应的监听器处理

  • listenersPlusTimers:维护所有注册的监听器+Timer。其数据结构是线程安全的CopyOnWriteArrayList,存放的对象是二元组(L, Option[Timer])。L是监听器引用,Timer是Metrics计时器,用来统计RPC以及耗时
  • getTimer:获取监听器的时间统计Timer。protected类型,子类覆盖实现
  • addListener:向listenersPlusTimers中添加监听器。final且thread-safe
  • removeListener:从listenersPlusTimers中移除监听器
  • postToAll:将事件post给所有监听器。线程不安全;使用java iterator避免asScala的wrapper cost;Timer统计;catch所有NonFatal Throwable
  • doPostEvent:将事件post给指定的监听器。具体实现由子类override
  • findListenersByClass:查找与指定类型T相同的监听器列表。implicitly[ClassTag[T]].runtimeClass:获取泛型T的实际类型;asInstanceOf:对象强制转换为T类型

ListenerBus继承体系

spark-listenerbus.png
  • SparkListenerBus:trait类型。实现将SparkListenerEvent事件doPostEvent()给SparkListenerInterface监听器。SparkListenerInterface、SparkListenerEvent也都是trait类型
  • AsyncEventQueue:处理事件的start、stop、dispatch、post功能。事件保存在LinkedBlockingQueue
  • ReplayListenerBus:用于从序列化的事件流中重放事件
  • ExternalCatalog:abstract class。提供了操作DB、Table、Function的功能抽象并postToAll操作事件Pre、After
  • HiveExternalCatalog:操作Hive MetaStore的具体功能实现
  • InMemoryCatalog:HashMap保存并操作DB、Table信息
  • StreamingListenerBus:针对spark streaming各个阶段的事件监听
  • StreamingQueryListenerBus:Data Stream查询事件

SparkListenerBus详解

private[spark] trait SparkListenerBus
  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
  protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      ...
      case _ => listener.onOtherEvent(event)
    }
  }
}

SparkListenerBus继承ListenerBus,实现doPostEvent()方法。SparkListenerInterface是listener的trait抽象,SparkListenerEvent是event的trait抽象。SparkListener是abstract class类型,对SparkListenerInterface的缺省适配,提供方法的空实现

SparkListenerBus实现了具体类型的event与listener的调用绑定

AsyncEventQueue详解

AsyncEventQueue继承SparkListenerBus、ListenerBus,拥有CopyOnWriteArrayList类型的监听器列表,以及LinkedBlockingQueue事件队列。监听器类型是SparkListenerInterface,事件类型是SparkListenerEvent

private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
  extends SparkListenerBus
  with Logging {

  import AsyncEventQueue._

  // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
  // it's perpetually being added to more quickly than it's being drained.
  // 10000长度的阻塞队列,保存事件
  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

  // Keep the event count separately, so that waitUntilEmpty() can be implemented properly;
  // this allows that method to return only when the events in the queue have been fully
  // processed (instead of just dequeued).
  // 未执行事件数。可以直接eventQueue.size()获取队列count
  private val eventCount = new AtomicLong()

  /** A counter for dropped events. It will be reset every time we log it. */
  // 删除事件数。日志打印时清零:droppedEventsCounter.compareAndSet(droppedCount, 0)
  private val droppedEventsCounter = new AtomicLong(0L)

  /** When `droppedEventsCounter` was logged last time in milliseconds. */
  @volatile private var lastReportTimestamp = 0L

  private val logDroppedEvent = new AtomicBoolean(false)

  private var sc: SparkContext = null

  private val started = new AtomicBoolean(false)
  private val stopped = new AtomicBoolean(false)

  private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
  private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")

  // Remove the queue size gauge first, in case it was created by a previous incarnation of
  // this queue that was removed from the listener bus.
  metrics.metricRegistry.remove(s"queue.$name.size")
  metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
    override def getValue: Int = eventQueue.size()
  })

  // 新建一个daemon dispatchThread,用于事件分发
  private val dispatchThread = new Thread(s"spark-listener-group-$name") {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
      dispatch()
    }
  }

  // 分发事件。take事件,检查是否为POISON_PILL,然后将事件投递给监听器postToAll
  private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
    try {
      var next: SparkListenerEvent = eventQueue.take()
      while (next != POISON_PILL) {
        val ctx = processingTime.time()
        try {
          super.postToAll(next)
        } finally {
          ctx.stop()
        }
        eventCount.decrementAndGet()
        // 这里take是阻塞获取队列事件,保证next有值,while就会一直循环
        next = eventQueue.take()
      }
      eventCount.decrementAndGet()
    } catch {
      case ie: InterruptedException =>
        logInfo(s"Stopping listener queue $name.", ie)
    }
  }

  override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
  }

  /**
   * Start an asynchronous thread to dispatch events to the underlying listeners.
   *
   * @param sc Used to stop the SparkContext in case the async dispatcher fails.
   */
  // 启动异步线程
  private[scheduler] def start(sc: SparkContext): Unit = {
    if (started.compareAndSet(false, true)) {
      this.sc = sc
      dispatchThread.start()
    } else {
      throw new IllegalStateException(s"$name already started!")
    }
  }

  /**
   * Stop the listener bus. It will wait until the queued events have been processed, but new
   * events will be dropped.
   */
  // 停止事件分发。修改stopped状态,put POISON_PILL到eventQueue,等待线程执行结束dispatchThread.join()。优雅的实现停止:POISON_PILL
  private[scheduler] def stop(): Unit = {
    if (!started.get()) {
      throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
    }
    if (stopped.compareAndSet(false, true)) {
      eventCount.incrementAndGet()
      eventQueue.put(POISON_PILL)
    }
    dispatchThread.join()
  }

  // 接收事件
  def post(event: SparkListenerEvent): Unit = {
    // stop时直接return
    if (stopped.get()) {
      return
    }

    eventCount.incrementAndGet()
    // offer事件到eventQueue,成功则return,false则drop event
    if (eventQueue.offer(event)) {
      return
    }

    eventCount.decrementAndGet()
    droppedEvents.inc()
    // 当队列满时,drop event且droppedEventsCounter变量加1
    droppedEventsCounter.incrementAndGet()
    if (logDroppedEvent.compareAndSet(false, true)) {
      // Only log the following message once to avoid duplicated annoying logs.
      logError(s"Dropping event from queue $name. " +
        "This likely means one of the listeners is too slow and cannot keep up with " +
        "the rate at which tasks are being started by the scheduler.")
    }
    logTrace(s"Dropping event $event")

    val droppedCount = droppedEventsCounter.get
    if (droppedCount > 0) {
      // Don't log too frequently
      // 超过一分钟时log记录
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        // There may be multiple threads trying to decrease droppedEventsCounter.
        // Use "compareAndSet" to make sure only one thread can win.
        // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
        // then that thread will update it.
        // 这里使用compareAndSet执行线程安全。也可以双重校验锁判断lastReportTimestamp,但没有CAS轻量!
        if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          val previous = new java.util.Date(prevLastReportTimestamp)
          logWarning(s"Dropped $droppedCount events from $name since $previous.")
        }
      }
    }
  }
}

// 一个“毒药丸”事件,标识队列事件执行到此结束
private object AsyncEventQueue {
  val POISON_PILL = new SparkListenerEvent() { }
}

LiveListenerBus详解

LiveListenerBus类是AsyncEventQueue管理器,建立了shared、appStatus、executorManagement、eventLog四种AsyncEventQueue事件调度器,保存在CopyOnWriteArrayList列表。创建多个AsyncEventQueue,提高事件响应时间

post方法会将事件投递给所有Queue,而每个Queue的listener又是不同的,根据name区分。start时,CopyOnWriteArrayList列表中的所有AsyncEventQueue都会start,也就是迭代调用AsyncEventQueue.start(),多个线程并发事件调度

private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()

/** Add a listener to queue shared by all non-internal listeners. */
  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, SHARED_QUEUE)
  }

  /** Add a listener to the executor management queue. */
  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
  }

  /** Add a listener to the application status queue. */
  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, APP_STATUS_QUEUE)
  }

  /** Add a listener to the event log queue. */
  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EVENT_LOG_QUEUE)
  }
  
  private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }

    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)

      case None =>
        val newQueue = new AsyncEventQueue(queue, conf, metrics)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }

Hadoop的AsyncDispatcher详解

Hadoop的AsyncDispatcher与Spark的AsyncEventQueue功能类似,部分实现细节不一样

public interface Event<TYPE extends Enum<TYPE>> {
  TYPE getType();
  long getTimestamp();
  String toString();
}

public interface EventHandler<T extends Event> {
  void handle(T event);
}

public interface Dispatcher {
  // Configuration to make sure dispatcher crashes but doesn't do system-exit in
  // case of errors. By default, it should be false, so that tests are not
  // affected. For all daemons it should be explicitly set to true so that
  // daemons can crash instead of hanging around.
  public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
      "yarn.dispatcher.exit-on-error";

  public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;

  EventHandler getEventHandler();

  void register(Class<? extends Enum> eventType, EventHandler handler);
}

EventHandler.handle(Event)这是完整的方法签名。抽象事件处理器接口EventHandler,具体化事件处理器方法handle(),抽象事件Event

那怎么一个EventHandler处理多个Event?instanceof或type,就能一个handle方法处理多种Event。Hadoop引入Enum类型的type:TYPE extends Enum<TYPE>

Dispatcher接口将EventType与EventHandler绑定。这里eventType是Class类型,而不是Enum。比如JobEventHandler直接绑定JobEventType class类型即可。若是绑定具体Enum,则要循环多次,比如JobEventType.JOB_INIT绑定JobEventHandler,JobEventType.JOB_START绑定JobEventHandler

public abstract class AbstractEvent<TYPE extends Enum<TYPE>> 
    implements Event<TYPE> {

  private final TYPE type;
  private final long timestamp;

  // use this if you DON'T care about the timestamp
  public AbstractEvent(TYPE type) {
    this.type = type;
    // We're not generating a real timestamp here.  It's too expensive.
    timestamp = -1L;
  }

  // use this if you care about the timestamp
  public AbstractEvent(TYPE type, long timestamp) {
    this.type = type;
    this.timestamp = timestamp;
  }

  @Override
  public long getTimestamp() {
    return timestamp;
  }

  @Override
  public TYPE getType() {
    return type;
  }

  @Override
  public String toString() {
    return "EventType: " + getType();
  }
}

因为Event事件都会具体实例化,所以定义一个abstract class AbstractEvent,将type、timestamp共同的属性放入构造函数。若是将AbstractEvent定义为普通类class,则可以实例化,但AbstractEvent的实例化没有现实意义,所以进行abstract

public class AsyncDispatcher extends AbstractService implements Dispatcher {

  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);

  private final BlockingQueue<Event> eventQueue;
  private volatile boolean stopped = false;

  // Configuration flag for enabling/disabling draining dispatcher's events on
  // stop functionality.
  private volatile boolean drainEventsOnStop = false;

  // Indicates all the remaining dispatcher's events on stop have been drained
  // and processed.
  private volatile boolean drained = true;
  private Object waitForDrained = new Object();

  // For drainEventsOnStop enabled only, block newly coming events into the
  // queue while stopping.
  private volatile boolean blockNewEvents = false;
  private EventHandler handlerInstance = null;

  private Thread eventHandlingThread;
  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
  private boolean exitOnDispatchException;

  // 无长度限制的LinkedBlockingQueue
  public AsyncDispatcher() {
    this(new LinkedBlockingQueue<Event>());
  }

  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
    super("Dispatcher");
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
  }

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          drained = eventQueue.isEmpty();
          // blockNewEvents is only set when dispatcher is draining to stop,
          // adding this check is to avoid the overhead of acquiring the lock
          // and calling notify every time in the normal run of the loop.
          if (blockNewEvents) {
            synchronized (waitForDrained) {
              if (drained) {
                waitForDrained.notify();
              }
            }
          }
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);
          }
        }
      }
    };
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    this.exitOnDispatchException =
        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    //start all the components
    super.serviceStart();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName("AsyncDispatcher event handler");
    eventHandlingThread.start();
  }

  public void setDrainEventsOnStop() {
    drainEventsOnStop = true;
  }

  @Override
  protected void serviceStop() throws Exception {
    if (drainEventsOnStop) {
      blockNewEvents = true;
      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
      synchronized (waitForDrained) {
        while (!drained && eventHandlingThread != null
            && eventHandlingThread.isAlive()) {
          waitForDrained.wait(1000);
          LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
              eventHandlingThread.getState());
        }
      }
    }
    stopped = true;
    if (eventHandlingThread != null) {
      eventHandlingThread.interrupt();
      try {
        eventHandlingThread.join();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);
      }
    }

    // stop all the components
    super.serviceStop();
  }

  @SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
        Thread shutDownThread = new Thread(createShutDownThread());
        shutDownThread.setName("AsyncDispatcher ShutDown handler");
        shutDownThread.start();
      }
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

  @Override
  public EventHandler getEventHandler() {
    if (handlerInstance == null) {
      handlerInstance = new GenericEventHandler();
    }
    return handlerInstance;
  }

  class GenericEventHandler implements EventHandler<Event> {
    public void handle(Event event) {
      if (blockNewEvents) {
        return;
      }
      drained = false;

      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        // Need to reset drained flag to true if event queue is empty,
        // otherwise dispatcher will hang on stop.
        drained = eventQueue.isEmpty();
        throw new YarnRuntimeException(e);
      }
    };
  }

  /**
   * Multiplexing an event. Sending it to different handlers that
   * are interested in the event.
   * @param <T> the type of event these multiple handlers are interested in.
   */
   // 一个Event对应多个EventHandler
  static class MultiListenerHandler implements EventHandler<Event> {
    List<EventHandler<Event>> listofHandlers;

    public MultiListenerHandler() {
      listofHandlers = new ArrayList<EventHandler<Event>>();
    }

    @Override
    public void handle(Event event) {
      for (EventHandler<Event> handler: listofHandlers) {
        handler.handle(event);
      }
    }

    void addHandler(EventHandler<Event> handler) {
      listofHandlers.add(handler);
    }
  }

  Runnable createShutDownThread() {
    return new Runnable() {
      @Override
      public void run() {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    };
  }

  @VisibleForTesting
  protected boolean isEventThreadWaiting() {
    return eventHandlingThread.getState() == Thread.State.WAITING;
  }

  @VisibleForTesting
  protected boolean isDrained() {
    return this.drained;
  }
}

AsyncDispatcher与AsyncEventQueue

异同点

相同点

  • 维护LinkedBlockingQueue保存Event
  • 有start、stop方法启停
  • 另起一个线程循环消费Queue并dispatch event
  • 提供生产事件方法。AsyncDispatcher是内部类GenericEventHandler.handle;AsyncEventQueue是post方法

不同点

  • AsyncDispatcher提供register方法将Event和EventHandler绑定,通过EventType来进行handle时调用不同方法;AsyncEventQueue提供doPostEvent方法实现不同Event调用Listener接口的不同方法(SparkListenerBus.doPostEvent)

AsyncDispatcher消费事件dispatch时,可以根据Event类型,HashMap中获取register的EventHandler,然后执行handle方法。AsyncEventQueue消费事件dispatch时,只能遍历Listener,调用doPostEvent方法,模式匹配后执行对应的事件监听器

Hadoop的事件是Event,处理器是EventHandler,方法是handle()。而Spark进行了更进一步的抽象,ListenerBus[L, E]。SparkListenerInterface、StreamingListener、ExternalCatalogEventListener是L的泛型trait;SparkListenerEvent、StreamingListenerEvent、ExternalCatalogEvent是E的泛型trait

使用场景

AsyncDispatcher
ResourceManager、NodeManager、MRAppMaster等类在serviceInit()方法中调用createDispatcher(),创建AsyncDispatcher对象。每个类负责处理一系列Enum类事件,而不只是一种Enum类

rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);

AsyncDispatcher不作全局的事件调度器,而是每个重要的类都有一个自身的AsyncDispatcher调度器,提高事件响应时间

AsyncEventQueue
LiveListenerBus是对AsyncEventQueue的统一管理器,相比Hadoop的各自为政,架构上更优秀。SparkContext在初始化时创建LiveListenerBus,并传递给JobScheduler、DAGScheduler等引用,所以LiveListenerBus必须是线程安全的

Guava的EventBus

EventBus发布订阅源码分析

EventBus源码解读

Android EventBus源码解析

相关文章

网友评论

    本文标题:事件总线--ListenerBus

    本文链接:https://www.haomeiwen.com/subject/xgnbqftx.html