Guava之EventBus源码

作者: 小猫无痕 | 来源:发表于2019-05-23 21:42 被阅读13次

    最近需要使用事件驱动,打算使用EventBus管理事件的注册和分发。于是仔细阅读了下Guava的EventBus实现,并在此做了些整理。
    EventBus是基于设计模式中的Observer模式的实现。Observer模式是非常常用的设计模式之一,jdk中的EventObject、EventListener、Observable、Observer都是为观察者模式服务的。但随着业务场景复杂度的不断提高,我们希望能在管理事件的同时提供更多的扩展。所以我们通过EventBus来优雅的实现这些。


    Observer模式

    我们先简单回顾下Observer模式:


    观察者模式.png

    定义对象之间的一对多依赖关系,以便当一个对象更改状态时,它的所有依赖关系都会被通知并自动更新。

    Observer Pattern: Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

    一些文中的“发布-订阅(Publish/Subscribe)模式”其实就是Observer模式,他所做的事情和我们将要做的事情一样:丰富Subject的功能。


    EventBus

    首先,我们先来看一下EventBus模块的类:


    EventBus.jpg

    EventBus.class

    EventBus.class:它对应于Subject类,是整个模块的核心,也是功能扩展的中心点。
    首先看下EventBus.class包含的以下几个变量:

      private final String identifier;
      private final Executor executor;
      private final SubscriberExceptionHandler exceptionHandler;
      private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
      private final Dispatcher dispatcher;
    
    • identifier:定义了EventBus的名称,用来区分项目中的多个EventBus,在之后的Exception等日志输出,和线程命名时都会有使用。
    private static Logger logger(SubscriberExceptionContext context) {
          return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
        }
    
    • executor:java的异步执行的实现类。用来执行订阅者处理Event的方法。值得注意的是在'EventBus'中对Executor变量赋值的构造器是私有的,也就是说我们只能使用它所指定的Executor:'DirectExecutor.class'。但'AsyncEventBus'的Executor是被允许传入的。这也是这两者的区别所在之一。
      public EventBus(String identifier) {
        this(identifier, MoreExecutors.directExecutor(),
            Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE);
      }
      EventBus(String identifier, Executor executor, Dispatcher dispatcher,
          SubscriberExceptionHandler exceptionHandler) {...}
    
    • exceptionHandler:SubscriberExceptionHandler的实现类,用于处理过程中产生的异常。
    • subscribers:创建了一个SubscriberRegistry用来维护Subscriber与Event的对应关系。
    • dispatcher:Dispatcher的实现类,他是一个Event的分发器,所有Event都会经过dispatcher传递给Subscriber。和executor一样,它也不能被从外部传入,在'EventBus'中默认使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默认使用'LegacyAsyncDispatcher'。这是两个类的唯二区别了。

    然后我们看下EventBus的方法,作为一个核心类,它一共只有三个public方法:

    • register:注册event。
    • unregister:取消event注册。
    • post:发布event。
     public void register(Object object) {
        subscribers.register(object);
      }
    public void unregister(Object object) {
        subscribers.unregister(object);
      }
    public void post(Object event) {
      ...
          dispatcher.dispatch(event, eventSubscribers);
      ...
      }
    

    仅有的三个方法也都异常的简单,'register'和'unregister'都调用了SubscriberRegistry类,'post'交给了Dispatcher类。而多线程的控制也通过'executor'交给了Subscriber,异常的处理不在自身管理同样传递给了Subscriber,作为中心的EventBus只做了功能的定义和分配,事件的转发,完美的实现了功能的解耦,做到了职责单一原则。

    AsyncEventBus.class

    AsyncEventBus.class是EventBus.class的异步多线程的子类,上面也有提到过,二者之间只在构造器中有两处区别:

    1. Executor:EventBus默认使用'DirectExecutor.class',他是一个线程执行器,简单的直接执行传入的Runnable。AsyncEventBus正好相反,它的Executor必须是传入的。
     private enum DirectExecutor implements Executor {
        INSTANCE;
        @Override public void execute(Runnable command) {
          command.run();
        }
    }
    
    1. Dispatcher:在'EventBus'中默认使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默认使用'LegacyAsyncDispatcher'。前者是单线程同步,后者是多线程同步。两者的具体区别在下面介绍。

    通过上面的描述,两者并不能通过他们类名简单的区别为一个单线程,一个多线程。他们的区别同样可以总结为两点:

    • Subscriber中都是多线程调用方法执行event,区别是'EventBus'只简单的run()了线程,而'AsyncEventBus'能过定义线程池。
    • Dispatcher中都是同步分发,区别是'EventBus'使用了ThreadLocal实现了单线程同步,而'AsyncEventBus'通过ConcurrentLinkedQueue使多线程同步分发。

    Dispatcher.class

    Dispatcher是一个抽象类,它本身是default的,因此无法被外部继承,EventBus也没有可以传入Dispatcher的构造器,所以对于Dispatcher我们是无法正常扩展的。
    Dispatcher中只有一个抽象方法:来实现消息的分发。

    abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
    

    还有三个静态方法来创建它的三个实现类:

    • PerThreadQueuedDispatcher:将收到的Event保存在了ThreadLoacl中,意味着多线程中即使使用了同一个Dispatcher实现收到的event都会分开保存互不影响。
    private final ThreadLocal<Queue<Event>> queue =
            new ThreadLocal<Queue<Event>>() {...};
    private final ThreadLocal<Boolean> dispatching =
            new ThreadLocal<Boolean>() {...};
    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
    ...
    Queue<Event> queueForThread = queue.get();
    queueForThread.offer(new Event(event, subscribers));
    if (!dispatching.get()) {
            dispatching.set(true);
            try {
              Event nextEvent;
              while ((nextEvent = queueForThread.poll()) != null) {
                while (nextEvent.subscribers.hasNext()) {
                  nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
                }
              }
            } finally {
              dispatching.remove();
              queue.remove();
            }
          }
    }
    

    里面的dispatching用于避免重入事件分派,例如循环发起Event的场景。

    • LegacyAsyncDispatcher:创建了一个ConcurrentLinkedQueue来保存收到的Event。多个线程中使用同一个LegacyAsyncDispatcher实现的话,线程收到的Event会保存在一起,并共同完成所有Event的分发。
    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
            Queues.newConcurrentLinkedQueue();
     @Override
        void dispatch(Object event, Iterator<Subscriber> subscribers) {
          while (subscribers.hasNext()) {
            queue.add(new EventWithSubscriber(event, subscribers.next()));
          }
          EventWithSubscriber e;
          while ((e = queue.poll()) != null) {
            e.subscriber.dispatchEvent(e.event);
          }
        }
    

    值得注意的是由于分发也是多线程共同完成,这使得它将无法保证Event的顺序性。

    • ImmediateDispatcher:该dispatcher在事件发布时立即将事件分发给订阅者不使用中间队列。

    Subscriber.class

    对应于Observer的抽象类,但它更像是一种封装,Subscriber自身提供了静态创建方法,将真正的Observer实现类和执行Event的方法都与EventBus封装在了一起,通过反射实现了对应于不同Observer的抽象。

    static Subscriber create(EventBus bus, Object listener, Method method) {
        return isDeclaredThreadSafe(method)
            ? new Subscriber(bus, listener, method)
            : new SynchronizedSubscriber(bus, listener, method);
      }
    final void dispatchEvent(final Object event) {
        executor.execute(new Runnable() {
          @Override
          public void run() {
            try {
             method.invoke(target, checkNotNull(event));
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
      }
    

    SynchronizedSubscriber:在dispatchEvent()方法上加了synchronized同步锁,如果正在的Observer方法是线程不安全的话就需要用到此类。他是通过@AllowConcurrentEvents注解来判断的,这里就不多讲了。

    SubscriberRegistry.class

    维护了Subscriber与Event的对应关系,对EventBus进行了解耦,使EventBus职责单一。

    private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
          Maps.newConcurrentMap();
    
    • register/unregister/getSubscribers:这三个都是维护Subscriber与Event的对应关系的基本方法,这里就不多讲了,他们只会在EventBus中被调用。
    • findAllSubscribers:这是一个private方法,他在register/unregister中被调用,之所以单独拿出来,主要是想说明一下,他也是基于Annotation来实现的。在register时,我们只会传入Observer类,但Observer类需要订阅哪个Event,Event到底又需要调用哪个方法,都是在这个方法中能通过对@Subscriber找个Annotation的读取和method的反射。
      private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
        Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
        Class<?> clazz = listener.getClass();
        for (Method method : getAnnotatedMethods(clazz)) {
          Class<?>[] parameterTypes = method.getParameterTypes();
          Class<?> eventType = parameterTypes[0];
          methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
        }
        return methodsInListener;
      }
    

    最后多提一句,@Subscriber标注的那些Method都是事先通过'getAnnotatedMethodsNotCached' 方法获取,保存在了一个LoadingCache中的。由于和EventBus的机制没有太大关系,这里就不展开了。

      private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
          CacheBuilder.newBuilder().weakKeys()
              .build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
                @Override
                public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
                  return getAnnotatedMethodsNotCached(concreteClass);
                }
              });
    

    总结

    Guava的EventBus以EventBus类为中心,对于Event的发布、订阅者的管理、异常的处理都提供了专门的实现类,流程非常清楚。而且基于Annotation扫描绑定的方式会使代码非常的简洁。但由于这种方式,在EventBus中对于事件类型和事件参数等等并不能提供很好的支撑,而且由于基本所有的类都是default权限的,这使得扩展异常的艰难T~T


    Classfier扩展

    待续...

    相关文章

      网友评论

        本文标题:Guava之EventBus源码

        本文链接:https://www.haomeiwen.com/subject/mgimzqtx.html