本人通过源码的解读,只是为了加深对其执行流程的理解,文章中不会对更细致的地方做过多的讲解,只是把握住开源框架的整体脉络。
EventBus的整体执行流程包括四部分:注册、事件发送、事件接收、取消注册。
在源码讲解之前首先先放上一个简单的例子:
// EventBus注册
EventBus.getDefault().register(this);
// 事件发送
EventBus.getDefault().post(new EventType(xxx));
// 接收事件回调
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEventMainThread(EventType type) {
// todo do something
}
// 取消注册
EventBus.getDefault().unregister(this);
1、事件注册
在讲解注册之前,大家先思考下,EventBus是是用来干什么的?
简单来说,EventBus就是为了简化Android事件的传递。
参考上面的接收事件传递的例子,EventBus如何去回调该方法?这正是EventBus事件注册的由来。
EventBus在注册期间,会遍历要注册类里的所有public方法,从中找到被Subscribe该注解类注解的方法,并将该注解的threadMode,方法名,注册事件类型缓存起来,用于在事件发送的时候回调该方法二用。
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
// 通过判断是否是最后一位,或者优先权的判断来添加该注解方法的所有信息
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// subscribedEvents作用是取消指定注册类的所有注册方法。
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
......
subscribedEvents.add(eventType);
......
}
上面代码有两个关键点:
第一,通过subscriberMethodFinder该对象查找注册类中所有被Subscribe注解的方法,并缓存该方法的所有信息到List<SubscriberMethod>内部,具体读取并缓存略过;
第二,通过subscribe方法将缓存信息保存到subscriptionsByEventType这个属性里,它是一个Map,key为注册的消息类型,value为对应的注册方法。这个属性就是发送并回调指定类型消息的凭证。
注意:一个注册的类里面只可注册一个同类型的事件类型,否则就会在subscribe方法里抛出已注册该类型的异常。
2、事件的发送
首先来看下发送的执行方法post:
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
...
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
...
}
}
这里给大家提一下,EventBus是可以多线程事件的传递的,那么如何读取指定线程内部的数据呢?
EventBus使用的是currentPostingThreadState,它是一个ThreadLocal,便很好的解决的该问题。
略过该方法的其他处理细节,我们来看下该方法里的postSingleEvent方法,注意下,该方法是放在while循环里面的,也就是说post方法会发送该线程内所有未发送的事件类型的所有消息。
postSingleEvent源码如下:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
......
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
......
}
该方法主要是调用了postSingleEventForEventType方法,该方法如下:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 上文提到的subscriptionsByEventType,出现在了这里,用于提取指定事件类型的所有注册方法信息
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
postToSubscription(subscription, event, postingState.isMainThread);
......
}
return true;
}
return false;
}
接着调用了postToSubscription方法
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
// 正在发送的枚举值,不做讲解
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
好了,到了EventBus发送事件的重中之重来了。
大家可以看到Subscribe注解类的threadMode主要的类型包括MAIN,BACKGROUND,ASYNC,这三个实际上正是对应于事件发送的三种模式:UI主线程模式,后台模式、异步模式。
这里同时引入了下面要讲的三个Poster:mainThreadPoster、backgroundPoster、asyncPoster
UI主线程模式
a、在主线程
void invokeSubscriber(Subscription subscription, Object event) {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
}
直接反射调用注解方法执行事件接收的回调。
b、不在主线程
mainThreadPoster.enqueue(subscription, event);
其中mainThreadPoster实际上是一个Handler
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final EventBus eventBus;
......
void enqueue(Subscription subscription, Object event) {
// PendingPost是一个消息池
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
sendMessage(obtainMessage())
......
}
}
@Override
public void handleMessage(Message msg) {
while (true) {
PendingPost pendingPost = queue.poll();
......
// 最终执行情况a中的invokeSubscriber方法
eventBus.invokeSubscriber(pendingPost);
......
}
}
}
该handler的handleMessage内部是一个while无限循环,用于消息的不断发送以及消息回调。
通过该流程可知,EventBus在Main模式的非主线程情况下,最终是通过handler的方式发送以及消息的回调的。
后台模式
a、在非主线程
直接执行invokeSubscriber方法反射回调接收事件。
b、在主线程
backgroundPoster.enqueue(subscription, event);
其中backgroundPoster是一个Runnable
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
......
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
...
eventBus.getExecutorService().execute(this);
}
}
@Override
public void run() {
while (true) {
PendingPost pendingPost = queue.poll(1000);
......
eventBus.invokeSubscriber(pendingPost);
}
}
}
可以看到其内部实现方式和mainThreadPoster(HandlerPoster)类似,只不过是通过线程池的方式触发run实现事件的最终发送和事件的接收。
异步模式
只有一种情况
asyncPoster.enqueue(subscription, event);
其中asyncPoster也是一个Runnable
class AsyncPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
asyncPoster实现方式很简单,只取出第一条事件并进行事件的发送和事件的接收回调,这也是和前两种模式的不同。
网友评论