一、背景
上一篇https://www.jianshu.com/writer#/notebooks/9031136/notes/51194824中讲解了一般情况下怎么使用guava EventBus,具体使用的是EventBus eventBus = new EventBus();
有实际对接过的同学会发现,它并没有按照预期的并行执行事件,而是串行等待事件执行完成。这是什么情况?
二、源码分析
1、EventBus类的属性和构造方法
//名称
private final String identifier;
//执行器
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers;
//事件分发器,有三类,默认选择的是Dispatcher.perThreadDispatchQueue()
private final Dispatcher dispatcher;
public EventBus() {
this("default");
}
public EventBus(String identifier) {
this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
}
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
this.subscribers = new SubscriberRegistry(this);
this.identifier = (String)Preconditions.checkNotNull(identifier);
this.executor = (Executor)Preconditions.checkNotNull(executor);
this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
}
可以看出EventBus使用的事件分发器是Dispatcher.perThreadDispatchQueue(),重点executor执行器是MoreExecutors.directExecutor()。下面我们看下guava的包com.google.common.util.concurrent下的类MoreExecutors的该方法源码。
public static Executor directExecutor() {
return MoreExecutors.DirectExecutor.INSTANCE;
}
// guava实现了java.util.concurrent.Executor接口,重点看execute()方法
// 利用枚举达到天然的单例
private static enum DirectExecutor implements Executor {
INSTANCE;
private DirectExecutor() {
}
//直接运行,并没有新开线程
public void execute(Runnable command) {
command.run();
}
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
2、Dispatcher事件分发器的三种实现
//
package com.google.common.eventbus;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
abstract class Dispatcher {
Dispatcher() {
}
//静态方法实现单例
static Dispatcher perThreadDispatchQueue() {
return new Dispatcher.PerThreadQueuedDispatcher();
}
static Dispatcher legacyAsync() {
return new Dispatcher.LegacyAsyncDispatcher();
}
static Dispatcher immediate() {
return Dispatcher.ImmediateDispatcher.INSTANCE;
}
//抽象方法,待具体分发器自己实现
abstract void dispatch(Object var1, Iterator<Subscriber> var2);
//同步模式
private static final class ImmediateDispatcher extends Dispatcher {
private static final Dispatcher.ImmediateDispatcher INSTANCE = new Dispatcher.ImmediateDispatcher();
private ImmediateDispatcher() {
}
void dispatch(Object event, Iterator<Subscriber> subscribers) {
Preconditions.checkNotNull(event);
while(subscribers.hasNext()) {
((Subscriber)subscribers.next()).dispatchEvent(event);
}
}
}
//异步模式
private static final class LegacyAsyncDispatcher extends Dispatcher {
private final ConcurrentLinkedQueue<Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber> queue;
private LegacyAsyncDispatcher() {
this.queue = Queues.newConcurrentLinkedQueue();
}
void dispatch(Object event, Iterator<Subscriber> subscribers) {
Preconditions.checkNotNull(event);
while(subscribers.hasNext()) {
this.queue.add(new Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber(event, (Subscriber)subscribers.next()));
}
Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber e;
while((e = (Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber)this.queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
private static final class EventWithSubscriber {
private final Object event;
private final Subscriber subscriber;
private EventWithSubscriber(Object event, Subscriber subscriber) {
this.event = event;
this.subscriber = subscriber;
}
}
}
//异步模式,EventBus默认选择的事件分发器
private static final class PerThreadQueuedDispatcher extends Dispatcher {
private final ThreadLocal<Queue<Dispatcher.PerThreadQueuedDispatcher.Event>> queue;
private final ThreadLocal<Boolean> dispatching;
private PerThreadQueuedDispatcher() {
this.queue = new ThreadLocal<Queue<Dispatcher.PerThreadQueuedDispatcher.Event>>() {
protected Queue<Dispatcher.PerThreadQueuedDispatcher.Event> initialValue() {
return Queues.newArrayDeque();
}
};
this.dispatching = new ThreadLocal<Boolean>() {
protected Boolean initialValue() {
return false;
}
};
}
void dispatch(Object event, Iterator<Subscriber> subscribers) {
Preconditions.checkNotNull(event);
Preconditions.checkNotNull(subscribers);
Queue<Dispatcher.PerThreadQueuedDispatcher.Event> queueForThread = (Queue)this.queue.get();
queueForThread.offer(new Dispatcher.PerThreadQueuedDispatcher.Event(event, subscribers));
if (!((Boolean)this.dispatching.get()).booleanValue()) {
this.dispatching.set(true);
Dispatcher.PerThreadQueuedDispatcher.Event nextEvent;
try {
while((nextEvent = (Dispatcher.PerThreadQueuedDispatcher.Event)queueForThread.poll()) != null) {
while(nextEvent.subscribers.hasNext()) {
((Subscriber)nextEvent.subscribers.next()).dispatchEvent(nextEvent.event);
}
}
} finally {
this.dispatching.remove();
this.queue.remove();
}
}
}
private static final class Event {
private final Object event;
private final Iterator<Subscriber> subscribers;
private Event(Object event, Iterator<Subscriber> subscribers) {
this.event = event;
this.subscribers = subscribers;
}
}
}
}
3、通过上面可以看出,默认的EventBus是同步执行的,根本原因是Executor选择的是MoreExecutors的内部枚举类DirectExecutor。
我们选择使用AsyncEventBus异步事件总线。
package com.google.common.eventbus;
import com.google.common.annotations.Beta;
import com.google.common.eventbus.EventBus.LoggingHandler;
import java.util.concurrent.Executor;
@Beta
public class AsyncEventBus extends EventBus {
public AsyncEventBus(String identifier, Executor executor) {
super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
}
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
}
三、解决办法
如果我们想要达到并行执行,在使用的时候定义如下:
EventBus eventBus = new AsyncEventBus(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
这也验证了https://www.jianshu.com/writer#/notebooks/9031136/notes/36887503 文中开头所写。
网友评论