最近在项目中,发现有人在用EventBus做事件监听,当时我就觉得很厉害。赶紧学习一下,Goole真是厉害。
一个简单的例子
下面让我们来看一下EventBus的使用实例。
/**
* 订单发布
*
* @author tomxin
* @date 2018-11-21
* @since v1.0.0
*/
public class OrderPublish {
private EventBus eventBus;
public OrderPublish(EventBus eventBus) {
this.eventBus = eventBus;
}
public void orderStateChange(int state) {
if (0 == state) {
eventBus.post("the order is close");
return;
}
if (1 == state) {
eventBus.post(1);
return;
}
}
}
/**
* 事件监听器
*
* @author tomxin
* @date 2018-11-21
* @since v1.0.0
*/
public class OrderCloseListener {
/**
* 构造函数
*
* @param eventBus
*/
public OrderCloseListener(EventBus eventBus) {
eventBus.register(this);
}
/**
* 接收消息
*
* @param message
*/
@Subscribe
public void receiver(String message) {
System.out.println(message);
}
}
/**
* 订单开启监听者
*
* @author tomxin
* @date 2018-11-21
* @since v1.0.0
*/
public class OrderOpenListener {
/**
* 构造函数
*
* @param eventBus
*/
public OrderOpenListener(EventBus eventBus) {
eventBus.register(this);
}
/**
* 接收消息
*
* @param orderState
*/
@Subscribe
public void receiver(Integer orderState) {
System.out.println("this order is open,state = " + orderState);
}
}
/**
* Main函数
*
* @author tomxin
* @date 2018-11-21
* @since v1.0.0
*/
public class OrderMain {
public static void main(String[] args) {
// 初始化一个EventBus
EventBus eventBus = new EventBus();
// 初始化事件发布者
OrderPublish orderPublish = new OrderPublish(eventBus);
// 初始化事件监听者
OrderCloseListener orderCloseListener = new OrderCloseListener(eventBus);
OrderOpenListener orderOpenListener = new OrderOpenListener(eventBus);
//发布消息
orderPublish.orderStateChange(1);
orderPublish.orderStateChange(0);
}
}
记得添加上这个依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
可以看到,这个这的是太像之前写的 观察者模式+中介者模式。对于我们Spring项目中,可以通过@PostConstruct来向EventBus中注册监听者。EvenBus也可以通过类似的方法初始化。通过与@Subscribe注解就可以达到效果啦。
EventBus源码学习
首先来看一下EventBus的构造函数,
EventBus(
String identifier,
Executor executor,
Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
// EventBus的身份信息,可以简单的理解为起了一个别名,如果不显式赋值的话,会默认为“default”。
this.identifier = checkNotNull(identifier);
// executor是定义了一个线程池,事件的运行需要提交到这个线程池中。
this.executor = checkNotNull(executor);
// 处理事件的策略接口,默认PerThreadQueuedDispatcher,还有LegacyAsyncDispatcher,ImmediateDispatcher。
this.dispatcher = checkNotNull(dispatcher);
// 异常处理类,默认LoggingHandler.INSTANCE
this.exceptionHandler = checkNotNull(exceptionHandler);
}
下面来看一下,如何注册监听一个事件
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
public void register(Object object) {
// 调用了SubscriberRegistry的register方法
subscribers.register(object);
}
// 我们来看一下register中的方法具体是怎么实现的
// SubscriberRegistry类中的register方法
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
void register(Object listener) {
// 查找listener中所有被@Subscribe注解的方法,将对应的方法参数与被listener封装成的Subscriber,封装为一个Map
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
// 遍历当前HashMultimap,这里的eventType表示方法的参数
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
// 以eventType(方法参数)作为key,查询subscribers中是否已经存在相同的订阅者集合。
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
// 如果不存在,则新建一个CopyOnWriteArraySet类型的新集合。
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
// 将newSet,eventType赋值,返回引用赋值给eventSubscribers
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
// 将新的listener集合,添加到eventSubscribers的集合中。
eventSubscribers.addAll(eventMethodsInListener);
}
}
// 我们再来看一下,findAllSubscribers(listeners)方法是如何实现的。不难想到这是以反射的方式来做的,但是有一点大家要注意,这里规定了方法的参数只能有一个,我们一起来看一下。
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
// 首先创建了一个HashMultimap来存储这些listener
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
// 这个for循环,先调用getAnnotatedMethods(clazz)方法,找到所有带有@Subscriber注解的方法。
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 取第一个参数
Class<?> eventType = parameterTypes[0];
// 将eventType和Subscriber放入Map中。
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
// 再来看一下getAnnotatedMethods(class)方法。
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
CacheBuilder.newBuilder()
.weakKeys()
.build(
new CacheLoader<Class<?>, ImmutableList<Method>>() {
@Override
public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
return getAnnotatedMethodsNotCached(concreteClass);
}
});
// 如上subscriberMethodsCache使用了CacheBuilder.newBuilder(),然后定义了回调方法。getAnnotatedMethodsNotCached(concreteClass);
private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
return subscriberMethodsCache.getUnchecked(clazz);
}
// 我们来看一下这个方法是如何实现的
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
// 首先获取父类的Class
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
// 获取对应的方法参数。
Class<?>[] parameterTypes = method.getParameterTypes();
// 判断参数的长度,参数的长度不能大于1,否则会抛出异常。
checkArgument(
parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters."
+ "Subscriber methods must have exactly 1 parameter.",
method,
parameterTypes.length);
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
下面我们在看一下post方法是如何发布事件的
public void post(Object event) {
// 通过getSubscribers(event)方法,获取所有订阅者
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
// 通过dispacth(event,eventSubscribers)方法,将所有任务分发出去。
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
// 下面是getSubscribers方法
Iterator<Subscriber> getSubscribers(Object event) {
// 这一步的主要目的是,获取实体类event继承体系中所有的父类
ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
// 根据eventTypes的容量,初始化subscriberIterators
List<Iterator<Subscriber>> subscriberIterators =
Lists.newArrayListWithCapacity(eventTypes.size());
// 从subscribers中获取对应的事件监听者
for (Class<?> eventType : eventTypes) {
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
// eager no-copy snapshot
subscriberIterators.add(eventSubscribers.iterator());
}
}
return Iterators.concat(subscriberIterators.iterator());
}
// 再来看一下,dispatcher方法是如何工作的
// ThreadLocal维护的queue
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
// 维护线程是否正在分发任务的状态
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
void dispatch(Object event, Iterator<Subscriber> subscribers) {
// 首先对event,subscribers进行判空
checkNotNull(event);
checkNotNull(subscribers);
// 调用ThreadLocal的get方法,获取当前线程对应的队列。
Queue<Event> queueForThread = queue.get();
// 先讲新的事件放入到队列中
queueForThread.offer(new Event(event, subscribers));
// 判断当前线程是否在分发任务,如果没有在分发任务,则开始分发任务,如果已经在分发任务,则直接返回。
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
// 从队列中poll出对应的事件,判断是否为空。
while ((nextEvent = queueForThread.poll()) != null) {
// 循环该事件的监听者,将任务分发出去。
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
// dispatchEvent方法
final void dispatchEvent(final Object event) {
// 事件进入到这里之后,在给定的线程池中主动启动一个新的线程,通过invoke的方法,运行监听者对应的方法。
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
好了,EventBus的代码就讲解到这里了。可以看到EventBus里用到了很多内容,对于好的代码,我们还是要多学习的。
网友评论