Live boldly, push yourself, don't settle!
勇敢生活,突破自我,人生不设限!
EventBus是一个针对Android和java平台的发布/订阅事件总线。
从上图可以看到EventBus是通过观察者模式来实现事件的订阅和分发,通过这个库可以很容易的实现应用内的通信同时EventBus是支持跨module通信。
使用
- gradle依赖:
implementation 'org.greenrobot:eventbus:3.1.1'
- 定义通信事件:
public static class MessageEvent { /* Additional fields if needed */ }
- 在需要地方处理订阅事件
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {/* Do something */};
- 注册和反注册
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
public void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
- 发送事件
EventBus.getDefault().post(new MessageEvent());
订阅过程
笼统的讲EventBus通过一个单例对象管理所有的消息事件,将订阅事件存储在map中,在发送事件时,从map中获取到对应的订阅者进行事件的分发。那么接下来将通过几个关键方法来了解EventBus具体是如何实现的。
getDefault()
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
getDefault函数是EventBus的入口函数,可以看到这里通过DCL方式实现的单例模式。之所以为单例是因为需要对整个应用中的消息事件进行统一的调度管理。
register(this)
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
注册流程是通过传入的注册类subscriber拿到对应的.class对象,然后通过subscriberMethodFinder.findSubscriberMethods(subscriberClass)
获取subscriber这个类所有的@Subscribe
方法,这些@Subscribe
方法被封装成SubscriberMethod
对象并以list的形式返回,然后通过在一个同步代码块中进行订阅操作。
findSubscriberMethods方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
//如果缓存中存在说明该class对象已经被扫描处理过直接返回缓存结果
if (subscriberMethods != null) {
return subscriberMethods;
}
//关键点通过下面两个分支进行扫描
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
//没有获取到@Subscribe方法,有可能没有public修饰,有可能被混淆等等
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//将扫描到的结果缓存起来
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
METHOD_CACHE从名字就可以看出来是一个缓存容器,实际上他是一个ConcurrentHashMap,key是.class对象,value就是上面提到的被封装后的list集合。
上面逻辑如注释中描述一样非常简单,重点看一下是如何通过findUsingReflection
和findUsingInfo
找到订阅方法的。
findUsingReflection(subscriberClass)方法
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//从FIND_STATE_POOL缓存池中拿到FindState
//FindState是查找过程中引入中间变量类型,目的是简化逻辑和处理并发
FindState findState = prepareFindState();
//重新初始化FindState
findState.initForSubscriber(subscriberClass);
//查找的过程
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
//返回findState中封装的结果并释放FindState,让FindState重新回到FIND_STATE_POOL中
return getMethodsAndRelease(findState);
}
FindState中封装数据如下
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
Class<?> subscriberClass;
Class<?> clazz;
boolean skipSuperClasses;
SubscriberInfo subscriberInfo;
FindState是为了方便处理数据而抽象出的中间数据类型,他里面主要是封装查找过程中的数据list,另外一方面不将各种缓存list定义为成员变量而是采用局部变量的方式,也减少了线程之间的竞争。
总的来说FindState是一个中间状态,保存处理过程中得到的各种关键数据。
findUsingReflectionInSingleClass
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
//拿到class的所有methods
try {
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//遍历methods找到@Subscribe注解修饰的方法
for (Method method : methods) {
//获取方法的修饰符,只处理public修饰的
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取方法的参数类型,只处理一个参数的方法
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
//获取Subscribe注解,不为null说明该方法被@Subscribe修饰
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//获取参数的类型,即你订阅的Event类型
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
//获取注解上的信息,这里是运行的线程
ThreadMode threadMode = subscribeAnnotation.threadMode();
//将获取到方法、注解、线程等信息封装成SubscriberMethod,存储在findState
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
就是通过上面的代码找到你订阅的方法即你用@Subscribe
标记的方法,代码虽然较多,但逻辑简单,就是一个扫描注解的过程。从这里也可看到EventBus使用的运行时反射,具体解释在代码注释中。该方法将所有结果存储在findState后getMethodsAndRelease
方法会将其中的结果返回给需要调用的地方。
subscribe
通过上面一些列步骤可以得到register方法传入的类所有的订阅方法,并且以List<SubscriberMethod>
的形式返回,那么拿到这些订阅方法后是如何订阅的呢?回到register方法中的subscribe
部分。
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//获取该方法订阅事件的类型
Class<?> eventType = subscriberMethod.eventType;
//将订阅者和方法封装成一个主题对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//subscriptionsByEventType是一个map,key是事件类型,value是订阅了这个事件的方法集合
//从map中查找该事件是否已经处理过
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
//为null表示还没有处理过,就新建一个list去装这些订阅方法
subscriptions = new CopyOnWriteArrayList<>();
//将list保存在map中
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//不为null表示已经处理过,那么是存在list容器
//在list中检测当前这个方法是否已经被添加了,如果是抛异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//上面并没将方法放置在list中,这里才开始把方法放在list中
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//根据优先级把该方法放在集合对应位置,没有优先级就放在末尾
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//获取被注册的类订阅的事件Event类型集合
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
//没有处理过就新建list去容纳
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//将该次事件Event存放在list中
subscribedEvents.add(eventType);
//粘性事件处理,后面具体讲
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
到这里订阅处理工作过程就完成,总结一下就是在调用register方法的时候会将通过遍历寻找被注册的类中的所有方法,在其中找到订阅的方法并存储在容器中。
缓存容器
前面的订阅过程就是找到需要处理的类,然后存储在下面这几个map中,方便后面发布消息时处理。
//key是事件Event类型,value是list,list存储的都是订阅了该事件Event的方法
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
//key是被注册的对象,value是list,list存储的是被注册的这个对象所订阅的所有事件Event
private final Map<Object, List<Class<?>>> typesBySubscriber;
private final Map<Class<?>, Object> stickyEvents;
post过程
看完了订阅的过程,接下来看一下当调用了EventBus.getDefault().post(new MessageEvent());
后是如何通知到对应方法的。
public void post(Object event) {
//获取当前线程事件发送状态,下面分析会说这是个什么
PostingThreadState postingState = currentPostingThreadState.get();
//获取事件发送队列
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
//没有处于发送状态时才处理
//是否在主线程发送,最终通过looper == Looper.myLooper()判断
postingState.isMainThread = isMainThread();
//准备开始发送,先将状态改变
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//发送事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//重置状态
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
PostingThreadState是一个消息发送状态,定义如下:
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
currentPostingThreadState是一个ThreadLocal。什么是ThreadLocal,看我另外一篇文章!
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
所以综合起来就是EventBus为调用post方法的每一个线程都绑定了一个PostingThreadState发送状态。
postSingleEvent&postSingleEventForEventType
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//有继承发送所有事件
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//发送事件
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//发送事件
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//没有找到发送事件就发送默认事件
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
可以看到最终发送事件都指向postSingleEventForEventType
,追踪该方法。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//获取所有订阅了event事件的方法
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
//遍历所有方法进行消息发送
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//发送事件
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
由前面可知,subscriptionsByEventType.get(eventClass)
获取的是所有订阅了eventClass事件的方法。
postToSubscription
//根据订阅时线程的不同选择不同的方法进行发送
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
上面代码做的事很简单根据线程不同,选用不同的方法进行消息发送。接下来以常用的MAIN方式为例:
- 如果当前线程为主线程时直接通过反射方法发送事件
invokeSubscriber(subscription, event)
void invokeSubscriber(Subscription subscription, Object event) {
try {
//通过反射直接调用你写的@subscriber方法
//而这些方法在前面订阅过程中就已经遍历获取到了存储起来的
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
- 当前线程不为主线程,需要用handler机制发送到主线程处理。
mainThreadPoster.enqueue(subscription, event)
,EventBus构造方法中有这么一段。
//返回的AndroidHandlerMainThreadSupport
mainThreadSupport = builder.getMainThreadSupport();
//创建一个HandlerPoster
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
线程间切换是通过mainThreadPoster,而mainThreadPoster
是通过AndroidHandlerMainThreadSupport#createPoster
方法创建。
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
//实现了Poster接口的Handler对象
return new HandlerPoster(eventBus, looper, 10);
}
}
}
那么当调用mainThreadPoster.enqueue(subscription, event)
实际就是调用了HandlerPoster#enqueue
方法。
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//将消息放入队列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//发送消息消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
//handle方法
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// 从队列中获取需要处理的消息
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//处理事件,最终还是会调用invokeSubscriber
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
可以看到不管如何最终都会回调到invokeSubscriber(subscription, event)
,这就是post的过程。
写在后面的话
由于篇幅有限所有代码和流程不可能面面俱到,本文只讲解了最常用的注册和发送流程。有时间可能会补上粘性事件的部分。
网友评论