一、概述
在上篇文章中,讲解了EventBus的入门,并在文章中提到了同步事件模式和异步事件模型,下面将具体分析两种事件模型的实现机制
二、EventBus架构图
同步事件模型:事件的触发与事件的处理在同一个线程中,因此事件的处理是有序的
同步模型.png
异步事件模型:事件的触发与事件的处理在不同线程中,事件的处理是在单独的线程池中,因此处理的过程是无效的
异步模型.png
三、EventBus
EventBus
public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;
}
-
identifier:事件总线的标识,在应用中可以创建多个EventBus,通过 identifier 标识上面代码是EventBus的定义,下面对EventBus的属性简单讲解:
-
executor:该类实现了Java的Executor接口,用于对订阅者处理事件方法的执行,同步事件模型和异步事件模型采用了不同的 Executor
-
exceptionHandler:用于处理订阅者在执行事件处理方法时抛出的异常
-
subscribers:订阅者注册表,用于存储所有的事件以及事件处理器、订阅对象的对应关系。在上文中,@Subscribe注解标记的方法会被包装为一个Subscriber
-
dispatcher:事件分发器,用于分发事件给订阅对象的事件处理器,在不同事件模型上采用不同的事件分发器,在下面将详细讲解
四、事件订阅
(一)注册事件处理器
- 注册
EventBus eventBus = new EventBus();
//注册事件处理器
eventBus.register(new MessageListener());
调用register方法想事件总线注册事件处理器,下面查看register方面内部实现。
- register
void register(Object listener) {
//扫描@Subscribe注解,分装成对应的Subscriber
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
//将Subscriber添加到eventSubscribers
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
//获取已经注册了某个事件的处理器
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
(二) 事件的触发以及处理
- 事件的触发
EventBus eventBus = new EventBus();
//注册事件处理器
eventBus.register(new MessageListener());
ItemEvent itemEvent = new ItemEvent();
itemEvent.setTitle(Thread.currentThread().getName());
for (int i = 0; i < 100; i++) {
//事件的触发
eventBus.post(itemEvent);
}
通过调用 post 方法,触发一个ItemEvent,下面我们看 post 方法的具体实现
- post
public void post(Object event) {
//获取该事件所有的订阅者
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//通过dispatcher分发事件
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// 如果事件没有订阅者,那么该事件为DeadEvent
post(new DeadEvent(this, event));
}
}
从上面的代码可以看出,post 方法会获取该事件的所有监听者,如果监听者存在,就通过 dispatcher 分发事件到对应的处理器中,
dispatcher 是一个抽象类,对于不同的事件模型,Dispatcher有不同的实现类,下面从同步事件模型和异步事件模型来分析一下
Dispatcher的实现。
五、事件模型
(一) 同步事件模型
在同步事件模型中,通过 PerThreadQueuedDispatcher 来进行事件的分配,代码具体实现如下:
- dispatcher
private static final class PerThreadQueuedDispatcher extends Dispatcher {
/**每个线程一个事件分发队列 */
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
/** 每个线程一个事件分发状态,通过状态判断减少锁的操作,值得学习 */
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
//获取该线程的事件分发队列
Queue<Event> queueForThread = queue.get();
//事件压入队列
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
//设置分发状态
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
//删除
dispatching.remove();
queue.remove();
}
}
}
}
在上面的代码中,通过 ThreadLocal 保证并发情况下的线程安全
(二) 异步事件模型
- 异步
private static final class LegacyAsyncDispatcher extends Dispatcher {
/** 全局的并发队列 */
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
从上面的代码可以看出,将事件加入一个全局的并发队列中,在LegacyAsyncDispatcher中采用了ConcurrentLinkedQueue,但是这里有个问题,如果事件的发布比事件的消费快,会造成消息的堆积
五、事件的处理
上一小节分析了事件的分发,我们可以看出事件的处理是调用 Subscriber 的 dispatchEvent 方法进行事件的处理,下面进入到 dispatchEvent 中查看具体的实现。
- dispatchEvent
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
从上面的代码可以看出,Subscriber 通过调用 Executor 的 execute 方法来执行一个任务,然后通过反射调用调用到对应的事件处理方法,也就是我们使用 @Subscribe 注解标记的方法,在同步事件模型和异步事件模型下采用的不同的 Executor,AsyncEventbus 采用的使我们做完入参的线程池,所以说异步事件模型的事件触发和事件处理在不同的线程中
在之前我们说个同步线程模型的事件的触发和事件的处理都是在同一个线程中 ?
查看了同步线程模型 Executor 的实现就了解了
- executor
private enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
同步事件模型的 Executor 的execute 方法是直接调用 run 方法,因此事件的触发和事件的处理在同一个线程中,保证了事件的有序处理。
上面的代码通过枚举实现了单例模式,以后可以尝试
网友评论