最近需要使用事件驱动,打算使用EventBus管理事件的注册和分发。于是仔细阅读了下Guava的EventBus实现,并在此做了些整理。
EventBus是基于设计模式中的Observer模式的实现。Observer模式是非常常用的设计模式之一,jdk中的EventObject、EventListener、Observable、Observer都是为观察者模式服务的。但随着业务场景复杂度的不断提高,我们希望能在管理事件的同时提供更多的扩展。所以我们通过EventBus来优雅的实现这些。
Observer模式
我们先简单回顾下Observer模式:
观察者模式.png
定义对象之间的一对多依赖关系,以便当一个对象更改状态时,它的所有依赖关系都会被通知并自动更新。
Observer Pattern: Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.
一些文中的“发布-订阅(Publish/Subscribe)模式”其实就是Observer模式,他所做的事情和我们将要做的事情一样:丰富Subject的功能。
EventBus
首先,我们先来看一下EventBus模块的类:
EventBus.jpg
EventBus.class
EventBus.class:它对应于Subject类,是整个模块的核心,也是功能扩展的中心点。
首先看下EventBus.class包含的以下几个变量:
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;
- identifier:定义了EventBus的名称,用来区分项目中的多个EventBus,在之后的Exception等日志输出,和线程命名时都会有使用。
private static Logger logger(SubscriberExceptionContext context) {
return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
}
- executor:java的异步执行的实现类。用来执行订阅者处理Event的方法。值得注意的是在'EventBus'中对Executor变量赋值的构造器是私有的,也就是说我们只能使用它所指定的Executor:'DirectExecutor.class'。但'AsyncEventBus'的Executor是被允许传入的。这也是这两者的区别所在之一。
public EventBus(String identifier) {
this(identifier, MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE);
}
EventBus(String identifier, Executor executor, Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {...}
- exceptionHandler:SubscriberExceptionHandler的实现类,用于处理过程中产生的异常。
- subscribers:创建了一个SubscriberRegistry用来维护Subscriber与Event的对应关系。
- dispatcher:Dispatcher的实现类,他是一个Event的分发器,所有Event都会经过dispatcher传递给Subscriber。和executor一样,它也不能被从外部传入,在'EventBus'中默认使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默认使用'LegacyAsyncDispatcher'。这是两个类的唯二区别了。
然后我们看下EventBus的方法,作为一个核心类,它一共只有三个public方法:
- register:注册event。
- unregister:取消event注册。
- post:发布event。
public void register(Object object) {
subscribers.register(object);
}
public void unregister(Object object) {
subscribers.unregister(object);
}
public void post(Object event) {
...
dispatcher.dispatch(event, eventSubscribers);
...
}
仅有的三个方法也都异常的简单,'register'和'unregister'都调用了SubscriberRegistry类,'post'交给了Dispatcher类。而多线程的控制也通过'executor'交给了Subscriber,异常的处理不在自身管理同样传递给了Subscriber,作为中心的EventBus只做了功能的定义和分配,事件的转发,完美的实现了功能的解耦,做到了职责单一原则。
AsyncEventBus.class
AsyncEventBus.class是EventBus.class的异步多线程的子类,上面也有提到过,二者之间只在构造器中有两处区别:
- Executor:EventBus默认使用'DirectExecutor.class',他是一个线程执行器,简单的直接执行传入的Runnable。AsyncEventBus正好相反,它的Executor必须是传入的。
private enum DirectExecutor implements Executor {
INSTANCE;
@Override public void execute(Runnable command) {
command.run();
}
}
- Dispatcher:在'EventBus'中默认使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默认使用'LegacyAsyncDispatcher'。前者是单线程同步,后者是多线程同步。两者的具体区别在下面介绍。
通过上面的描述,两者并不能通过他们类名简单的区别为一个单线程,一个多线程。他们的区别同样可以总结为两点:
- Subscriber中都是多线程调用方法执行event,区别是'EventBus'只简单的run()了线程,而'AsyncEventBus'能过定义线程池。
- Dispatcher中都是同步分发,区别是'EventBus'使用了ThreadLocal实现了单线程同步,而'AsyncEventBus'通过ConcurrentLinkedQueue使多线程同步分发。
Dispatcher.class
Dispatcher是一个抽象类,它本身是default的,因此无法被外部继承,EventBus也没有可以传入Dispatcher的构造器,所以对于Dispatcher我们是无法正常扩展的。
Dispatcher中只有一个抽象方法:来实现消息的分发。
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
还有三个静态方法来创建它的三个实现类:
- PerThreadQueuedDispatcher:将收到的Event保存在了ThreadLoacl中,意味着多线程中即使使用了同一个Dispatcher实现收到的event都会分开保存互不影响。
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {...};
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {...};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
...
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
里面的dispatching用于避免重入事件分派,例如循环发起Event的场景。
- LegacyAsyncDispatcher:创建了一个ConcurrentLinkedQueue来保存收到的Event。多个线程中使用同一个LegacyAsyncDispatcher实现的话,线程收到的Event会保存在一起,并共同完成所有Event的分发。
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
值得注意的是由于分发也是多线程共同完成,这使得它将无法保证Event的顺序性。
- ImmediateDispatcher:该dispatcher在事件发布时立即将事件分发给订阅者不使用中间队列。
Subscriber.class
对应于Observer的抽象类,但它更像是一种封装,Subscriber自身提供了静态创建方法,将真正的Observer实现类和执行Event的方法都与EventBus封装在了一起,通过反射实现了对应于不同Observer的抽象。
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
final void dispatchEvent(final Object event) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
method.invoke(target, checkNotNull(event));
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
SynchronizedSubscriber:在dispatchEvent()方法上加了synchronized同步锁,如果正在的Observer方法是线程不安全的话就需要用到此类。他是通过@AllowConcurrentEvents注解来判断的,这里就不多讲了。
SubscriberRegistry.class
维护了Subscriber与Event的对应关系,对EventBus进行了解耦,使EventBus职责单一。
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
- register/unregister/getSubscribers:这三个都是维护Subscriber与Event的对应关系的基本方法,这里就不多讲了,他们只会在EventBus中被调用。
- findAllSubscribers:这是一个private方法,他在register/unregister中被调用,之所以单独拿出来,主要是想说明一下,他也是基于Annotation来实现的。在register时,我们只会传入Observer类,但Observer类需要订阅哪个Event,Event到底又需要调用哪个方法,都是在这个方法中能通过对@Subscriber找个Annotation的读取和method的反射。
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
最后多提一句,@Subscriber标注的那些Method都是事先通过'getAnnotatedMethodsNotCached' 方法获取,保存在了一个LoadingCache中的。由于和EventBus的机制没有太大关系,这里就不展开了。
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
CacheBuilder.newBuilder().weakKeys()
.build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
@Override
public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
return getAnnotatedMethodsNotCached(concreteClass);
}
});
总结
Guava的EventBus以EventBus类为中心,对于Event的发布、订阅者的管理、异常的处理都提供了专门的实现类,流程非常清楚。而且基于Annotation扫描绑定的方式会使代码非常的简洁。但由于这种方式,在EventBus中对于事件类型和事件参数等等并不能提供很好的支撑,而且由于基本所有的类都是default权限的,这使得扩展异常的艰难T~T
Classfier扩展
待续...
网友评论