本文的整体结构图
整体结构图.png本文篇幅很长,建议收藏了找时间慢慢看
本文讲解的是
'org.greenrobot:eventbus:3.1.1'
最新版,和其他版本有细微差别,思想都是一样的。
EventBus 的简单示例
@Override
protected void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
protected void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
@Subscribe(threadMode = ThreadMode.MAIN, sticky = false, priority = 2)
public void onMessageEvent(MyBusEvent event) {
Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();
}
EventBus 的创建是一个单例模式,我们就从 getDefault() 开始讲起吧。
一、EventBus 的创建(单例 + Builder 设计模式)
1.1、单例模式获取 EventBus 实例
单例模式UML.pngEventBus 的创建是一个双重校验锁的单例模式。
public static EventBus getDefault() {
if (sDefaultBus == null) {
synchronized (EventBus.class) {
if (sDefaultBus == null) {
sDefaultBus = new EventBus();
}
}
}
return sDefaultBus;
}
单例模式没什么好说的,我们都知道单例模式的构造函数是私有类型 private,但是 EventBus 的构造函数却是 public 类型。
这样设计的目:EventBus 在代码使用过程中不仅仅只有一条总线,还有其他的订阅总线,订阅者可以注册到不同的 EventBus 总线,然后通过不同的 EventBus 总线发送数据。
不同的 EventBus 发送的数据是相互隔离的,订阅者只能收到注册了该 EventBus 总线内事件,而不会收到别的 EventBus 事件总线的数据。这样的设计为后面的不同环境的线程切换创造了好的条件。
/**
* Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a central bus, consider {@link #getDefault()}.
创建一个新的 EventBus 实例,每个实例在 events 事件被发送的时候都是一个单独的领域,为了使用一个 事件总线,考虑用 getDefault() 构建。
*/
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
this(DEFAULT_BUILDER);
}
1.2、Builder 模式构建 EventBus
Builder模式UML图.png这里的代码我们重点看一下 this(DEFAULT_BUILDER) 里面的 DEFAULT_BUILDER。DEFAULT_BUILDER 是一个 EventBusBuilder,从这里可以看出 EventBus 是通过 建造者模式进行构建的,接下来我们看下是如何构建的吧。
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
//Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType
subscriptionsByEventType = new HashMap<>();
//Map<Object, List<Class<?>>> typesBySubscriber
typesBySubscriber = new HashMap<>();
//Map<Class<?>, Object> stickyEvents
stickyEvents = new ConcurrentHashMap<>();
/**
用于线程间调度
**/
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
//用于记录event生成索引
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//对已经注解过的Method的查找器,会对所设定过 @Subscriber 注解的的方法查找相应的Event
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
//当调用事件处理函数发生异常是否需要打印Log
logSubscriberExceptions = builder.logSubscriberExceptions;
//当没有订阅者订阅这个消息的时候是否打印Log
logNoSubscriberMessages = builder.logNoSubscriberMessages;
//当调用事件处理函数,如果异常,是否需要发送Subscriber这个事件
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
//当没有事件处理函数时,对事件处理是否需要发送sendNoSubscriberEvent这个标志
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
//是否需要抛出SubscriberException
throwSubscriberException = builder.throwSubscriberException;
//与Event有继承关系的类是否都需要发送
eventInheritance = builder.eventInheritance;
//线程池 Executors.newCachedThreadPool()
executorService = builder.executorService;
}
二、EventBus 中几个重要的成员变量
2.1、EventBus 中重要的 3 个 HashMap。
2.1.1、subscriptionsByEventType
Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType 是以 Event 为 Key,Subscriber 为 Value,当发送这个 Event 时,可以在 Map 中找到相应的 Subscriber。
2.1.2、typesBySubscriber
Map<Object, List<Class<?>>> typesBySubscriber 以 Subscriber 为 Key,以 Event 为 Value,当我们注册和反注册时,都会操作 typesBySubscriber 这个 Map。
2.1.3、stickyEvents
Map<Class<?>, Object> stickyEvents 是管理 EventBus 的粘性事件,粘性事件的 Event 发送出去之后,即使负责接收的 Subscriber 没有注册,当注册之后,依然能收到该事件,和广播接受者的粘性事件类似。
2.2、EventBus 中重要的有 3 个 Post。
-
mainThreadPoster:主线程的 Poster
mainThreadSupport.createPoster(this)
返回的是HandlerPoster(eventBus, looper, 10)
,looper 是一个 MainThread 的 Looper。意味着这个 HandlerPoster 可能就是 Handler 的实现。 -
backgroundPoster:后台线程的 Poster
-
asyncPoster:异步线程的 Poster
2.2.1、mainThreadPoster # HandlerPoster
HandlerPoster 其实就是 Handler 的实现,内部维护了一个 PendingPostQueue 的消息队列,在 enqueue(Subscription subscription, Object event) 方法中不断从 pendingPostPool 的 ArrayList 缓存池中获取 PendingPost 添加到 PendingPostQueue 队列中,并将该 PendingPost 事件发送到 Handler 中处理。
在 handleMessage 中,通过一个 while 死循环,不断从 PendingPostQueue 中取出 PendingPost 出来执行,获取到 post 之后由 eventBus 通过该 post 查找相应的 Subscriber 处理事件。
while 退出的条件有两个
- 获取到的 PendingPost 为 null,即是 PendingPostQueue 已经没有消息可处理。
- 每个 PendingPost 在 Handler 中执行的时间超过了最大的执行时间。
HandlerPoster UML 类图
HandlerPoster UML 类图HandlerPoster 执行过程
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;//存放待执行的 Post Events 的事件队列
private final int maxMillisInsideHandleMessage;//Post 事件在 handleMessage 中执行的最大的时间值,超过这个时间值则会抛异常
private final EventBus eventBus;
private boolean handlerActive;//标识 Handler 是否被运行起来
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//从 pendingPostPool 的 ArrayList 缓存池中获取 PendingPost 添加到 PendingPostQueue 队列中,并将该 PendingPost 事件发送到 Handler 中处理。
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);//添加到队列中
if (!handlerActive) {//标记 Handler 为活跃状态
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {//死循环,不断从 PendingPost 队列中取出 post 事件执行
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {//如果为 null,表示队列中没有 post 事件,此时标记 Handelr 关闭,并退出 while 循环
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false; //标记 Handler 为非活跃状态
return;
}
}
}
//获取到 post 之后由 eventBus 通过该 post 查找相应的 Subscriber 处理事件
eventBus.invokeSubscriber(pendingPost);
//计算每个事件在 handleMessage 中执行的时间
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
final class PendingPost {
//通过ArrayList来实现PendingPost的添加和删除
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
//获取 PendingPost
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
//释放 PendingPost
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
2.2.2、backgroundPoster # BackGroundPoster
BackGroundPoster执行过程.pngBackgroundPoster 实现了 Runnable 和 Poster,enqueue() 和 HandlerPoster 中实现一样,在上文中已经讲过,这里不再赘述。
我们来看下 run() 方法中的实现,不断从 PendingPostQueue 中取出 pendingPost 到 EventBus 中分发,这里注意外部是 while() 死循环,意味着 PendingPostQueue 中所有的 pendingPost 都将分发出去。而 AsyncPoster 只是取出一个。
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);//添加到队列中
if (!executorRunning) {
executorRunning = true;
//在线程池中执行这个 pendingPost
eventBus.getExecutorService().execute(this);//Executors.newCachedThreadPool()
}
}
}
@Override
public void run() {
try {
try {
//不断循环从 PendingPostQueue 取出 pendingPost 到 eventBus 执行
while (true) {
//在 1000 毫秒内从 PendingPostQueue 中获取 pendingPost
PendingPost pendingPost = queue.poll(1000);
//双重校验锁判断 pendingPost 是否为 null
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();//再次尝试获取 pendingPost
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
//将 pendingPost 通过 EventBus 分发出去
//这里会将PendingPostQueue中【所有】的pendingPost都会分发,这里区别于AsyncPoster
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
2.2.3、asyncPoster # AsyncPoster
AsyncPoster执行过程.pngclass AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);//Executors.newCachedThreadPool()
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
三、@Subscribe 的注解
当我们指定订阅方法的时候,会在方法上加上注解,如下,下面我们看看这个注解的具体含义
@Subscribe(threadMode = ThreadMode.MAIN, sticky = false, priority = 2)
public void onMessageEvent(EventTest event) {
}
@Documented //命名为 java doc 文档
@Retention(RetentionPolicy.RUNTIME) //指定在运行时有效,即在运行时能保持这个 Subscribe
@Target({ElementType.METHOD}) //指定类型为 METHOD,表名用来描述方法
public @interface Subscribe {
//指定线程模式,可以指定在 Subscribe 中接收的 Event 所处的线程
ThreadMode threadMode() default ThreadMode.POSTING;
boolean sticky() default false;
int priority() default 0;
}
3.1、ThreadMode
public enum ThreadMode {
POSTING,//EventBus 默认的线程模式
MAIN,//主线程
MAIN_ORDERED,//主线程
BACKGROUND,//后台线程
ASYNC//异步线程
}
ThreadMode 是 enum(枚举)类型,threadMode 默认值是 POSTING。3.1.1 版本的 EventBus 新增了一种类型,共 5 种,以前的版本是 4 种。
- POSTING:事件发布在什么线程,就在什么线程订阅。故它不需要切换线程来分发事件,因此开销最小。它要求 task 完成的要快,不能请求 MainThread,适用简单的 task。
- MAIN:无论事件在什么线程发布,都在主线程订阅。事件将排队等待交付(非阻塞)。使用此模式的订阅者必须快速返回,以避免阻塞主线程。
- MAIN_ORDERED:无论事件在什么线程发布,都在主线程订阅。区别于 MAIN,MAIN_ORDERED 事件将始终排队等待交付。这将确保 post 调用是非阻塞的。
- BACKGROUND:如果发布的线程不是主线程,则在该线程订阅,如果是主线程,则使用一个单独的后台线程订阅。
- ASYNC:在非主线程和发布线程中订阅。当处理事件的 Method 是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus 使用线程池高效的复用已经完成异步操作的线程。
3.2、sticky 粘性事件
粘性事件是事件消费者在事件发布之后才注册,依然能接收到该事件的特殊类型。
StickyEvent 与普通 Event的 普通就在于,EventBus 会自动维护被作为 StickyEvent 被 post 出来(即在发布事件时使用 EventBus.getDefault().postSticky(new MyEvent()) 方法)的事件的最后一个副本在缓存中。 任何时候在任何一个订阅了该事件的订阅者中的任何地方,都可以通 EventBus.getDefault().getStickyEvent(MyEvent.class)来取得该类型事件的最后一次缓存。
sticky(粘性)默认值是 false,如果是 true,那么可以通过 EventBus 的 postSticky 方法分发最近的粘性事件给该订阅者(前提是该事件可获得)。
3.3、Priority
priority 是 Method 的优先级,优先级高的可以先获得分发事件的权利。这个不会影响不同的 ThreadMode 的分发事件顺序。
四、register 注册
register注册.png- 通过反射获取到订阅者的 Class 对象。
- 通过 Class 对象找到对应的订阅者方法集合。
- 遍历订阅者方法集合,将订阅者和订阅者方法订阅起来。
register() 接收的参数为 Object 类型的订阅者,通常也就是代码中 Activity 和 Fragment 的实例 this。EventBus 通过 getDefault() 来获取实例,当我们每新建一个 EventBus 总线,它的发布和订阅事件都是相互隔离的,EventBus 如何做到发布和订阅相互隔离呢?我们看下 register 的实现。
public void register(Object subscriber) {
//1. 通过反射获取到订阅者的Class对象
Class<?> subscriberClass = subscriber.getClass();
//2. 通过Class对象找到对应的订阅者方法集合
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
//3. 遍历订阅者方法集合,将订阅者和订阅者方法订阅起来。
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
在 1 中没什么可细讲的,接下来看下第 2 步中的 SubscriberMethodFinder 这个类的 findSubscriberMethods() 方法。
subscriberMethodFinder 是 EventBus 的一个成员,可以看作是一个订阅方法查找器,是在EventBus 构造方法通过 EventBusBuilder 的一些参数构造出来的。
调用 findSubscriberMethods 方法,传入订阅者的 Class 对象,字面意思是找出订阅者中所有的订阅方法,用一个 List 集合来接收。
4.1、SubscriberMethodFinder # findSubscriberMethods() 详解
findSubscriberMethods()详解.png//订阅者的 Class 对象为 key,订阅者中的订阅方法 List 为 value
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//1. 首先在 METHOD_CACHE 中查找该 Event 对应的订阅者集合是否已经存在,如果有直接返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//2. 根据订阅者类 subscriberClass 查找相应的订阅方法
if (ignoreGeneratedIndex) {//是否忽略生成 index
//通过反射获取
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//通过 SubscriberIndex 方式获取
subscriberMethods = findUsingInfo(subscriberClass);
}
//若订阅者中没有订阅方法,则抛异常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 缓存订阅者的订阅方法 List
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
//----------------------------SubscriberMethod----------------------------
//封装了EventBus中的参数,就是一个EventBus订阅方法包装类
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
String methodString;
//.....涉及到的方法后文再讲
}
METHOD_CACHE 是一个 ConcurrentHashMap,以订阅者的 Class 对象为 key,订阅者中的订阅方法 List 为 value,缓存了注册过的订阅方法。
如果有缓存则返回返回缓存,如果没有则继续往下执行。这里看到 ignoreGeneratedIndex 这个属性,意思为是否忽略生成 index,是在构造 SubscriberMethodFinder 通过 EventBusBuilder 的同名属性赋值的,默认为 false,当为 true 时,表示以反射的方式获取订阅者中的订阅方法,当为 false 时,则以 Subscriber Index 的方式获取。接下来分别分析这两种方式。
4.1.1、findUsingReflection() 方法解析
当 ignoreGeneratedIndex 为 true 时 --> findUsingReflection()
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// 创建并初始化 FindState 对象
FindState findState = prepareFindState();
// findState 与 subscriberClass 关联
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 使用反射的方式获取单个类的订阅方法
findUsingReflectionInSingleClass(findState);
// 使 findState.clazz 指向父类的 Class,继续获取
findState.moveToSuperclass();
}
// 返回订阅者及其父类的订阅方法 List,并释放资源
return getMethodsAndRelease(findState);
}
4.1.2、findUsingInfo() 方法解析
当 ignoreGeneratedIndex 为 false 时 --> findUsingInfo()
findUsingInfo()解析.png跟反射方式的 findUsingReflection 的首尾有点类似,不同的是它是通过 SubscriberInfo 这个类来获取订阅方法的,那么 SubscriberInfo 对象是怎么获取的呢,那么同样只看关键代码:getSubscriberInfo()
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//1.通过 prepareFindState 获取到 FindState(保存找到的注解过的方法的状态)
FindState findState = prepareFindState();
//2.findState 与 subscriberClass 关联
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//获取订阅者信息
//通过 SubscriberIndex 获取 findState.clazz 对应的 SubscriberInfo
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
// 逐个添加进 findState.subscriberMethods
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 使用反射的方式获取单个类的订阅方法
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
FindState 封装了所有的订阅者和订阅方法的集合。
static class FindState {
//保存所有订阅方法
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
//事件类型为Key,订阅方法为Value
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
//订阅方法为Key,订阅者的Class对象为Value
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
Class<?> subscriberClass;
Class<?> clazz;
boolean skipSuperClasses;
SubscriberInfo subscriberInfo;
//......
}
通过 prepareFindState 获取到 FindState 对象,根据 FindState 对象可以进行下一步判断,
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
private FindState prepareFindState() {
//找到 FIND_STATE_POOL 对象池
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
//当找到了对应的FindState
FindState state = FIND_STATE_POOL[i];
if (state != null) {//FindState 非空表示已经找到
FIND_STATE_POOL[i] = null; //清空找到的这个FindState,为了下一次能接着复用这个FIND_STATE_POOL池
return state;//返回该 FindState
}
}
}
//如果依然没找到,则创建一个新的 FindState
return new FindState();
}
4.1.3、findUsingReflectionInSingleClass() 方法解析
通过反射找到对应的方法,并进行过滤
findUsingReflectionInSingleClass()解析.pngprivate void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
//忽略非 public 和 static 的方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 获取订阅方法的所有参数
Class<?>[] parameterTypes = method.getParameterTypes();
// 订阅方法只能有一个参数,否则忽略
if (parameterTypes.length == 1) {
// 获取有 Subscribe 的注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
// 获取第一个参数
Class<?> eventType = parameterTypes[0];
// 检查 eventType 决定是否订阅,通常订阅者不能有多个 eventType 相同的订阅方法
if (findState.checkAdd(method, eventType)) {
// 获取线程模式
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 添加订阅方法进 List
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
经过修饰符、参数个数、是否有注解、和订阅者是否有 eventType 相同的方法几层条件的筛选,最终将订阅方法添加进 findState 的 subscriberMethods 这个 List 中。
EventBus 不仅仅获取当前类的订阅方法,还会获取它所有父类的订阅方法。
在 EventBus 中,一个订阅者包括这个订阅者的所有父类和子类,不会有多个方法相同的去接收同一个事件,但是有可能出现这样一种情况,子类去订阅了该事件,父类也去订阅了该事件。当出现这种情况,EventBus 如何判断?通过调用 checkAddWithMethodSignature() 方法,根据方法签名来检查。
4.1.4、checkAdd() & checkAddWithMethodSignature() 方法解析
checkAdd()&checkAddWithMethodSignature()解析.pngboolean checkAdd(Method method, Class<?> eventType) {
//事件类型为Key,订阅方法为Value
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
//put()方法执行之后,返回的是之前put的值
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
//根据方法签名来检查
return checkAddWithMethodSignature(method, eventType);
}
}
//订阅方法为Key,订阅者的Class对象为Value
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
//put方法返回的是put之前的对象
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
//如果methodClassOld不存在或者是methodClassOld的父类的话,则表明是它的父类,直接返回true。否则,就表明在它的子类中也找到了相应的订阅,执行的 put 操作是一个 revert 操作,put 进去的是 methodClassOld,而不是 methodClass
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
//这里是一个revert操作,所以如果找到了它的子类也订阅了该方法,则不允许父类和子类都同时订阅该事件,put 的是之前的那个 methodClassOld,就是将以前的那个 methodClassOld 存入 HashMap 去覆盖相同的订阅者。
//不允许出现一个订阅者有多个相同方法订阅同一个事件
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
FindState 的流程
- 调用 prepareFindState(),从 FIND_STATE_POOL 中获取一个 FindState,如果没有找到,则创建一个新的 FindState。
- 将 subscriberClass 通过 FindState 进行初始化。
- 返回所有的订阅者的方法和集合。
到此,两种方式讲解完毕。无论通过哪种方式获取,获取到订阅方法 List 之后,接下来是真正订阅的过程,回到register() 中看代码。
4.2、subscribe 订阅
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//迭代每个 Subscribe 方法,调用 subscribe() 传入 subscriber(订阅者) 和 subscriberMethod(订阅方法) 完成订阅,
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
subscribe订阅.png
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
// 创建 Subscription 封装订阅者和订阅方法信息
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//可并发读写的ArrayList,key为EventType,value为Subscriptions
//根据事件类型从 subscriptionsByEventType 这个 Map 中获取 Subscription 集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
//如果为 null,表示还没有订阅过,创建并 put 进 Map
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//若subscriptions中已经包含newSubscription,表示该newSubscription已经被订阅过,抛出异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
// 按照优先级插入subscriptions
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//key为订阅者,value为eventType,用来存放订阅者中的事件类型
//private final Map<Object, List<Class<?>>> typesBySubscriber;
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//将EventType放入subscribedEvents的集合中
subscribedEvents.add(eventType);
//判断是否为Sticky事件
if (subscriberMethod.sticky) {
//判断是否设置了事件继承
if (eventInheritance) {
//获取到所有Sticky事件的Set集合
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
//遍历所有Sticky事件
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
//判断当前事件类型是否为黏性事件或者其子类
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
// 执行设置了 sticky 模式的订阅方法
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
4.3、register 流程图
register流程图.png事件的订阅讲解到这里,接下来看看事件的分发 Post。
五、Post 分发
5.1、Post 方法解析
一般的事件发布方式
EventBus.getDefault().post(new EventTest());
接下来看看 Post 方法的具体代码。
post分发.png//currentPostingThreadState 线程独有的
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
public void post(Object event) {
//获取当前线程的 posting 状态
PostingThreadState postingState = currentPostingThreadState.get();
//获取当前事件队列
List<Object> eventQueue = postingState.eventQueue;
//将事件添加进当前线程的事件队列
eventQueue.add(event);
//判断是否正在posting
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
//如果已经取消,则抛出异常
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {//发送事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//状态复原
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
//发送事件的线程封装类
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();//事件队列
boolean isPosting;//是否正在 posting
boolean isMainThread;//是否为主线程
Subscription subscription;
Object event;
boolean canceled;//是否已经取消
}
EventBus 用 ThreadLocal 存储每个线程的 PostingThreadState,一个存储了事件发布状态的类,当 post 一个事件时,添加到事件队列末尾,等待前面的事件发布完毕后再拿出来发布,这里看事件发布的关键代码postSingleEvent()。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//查找到所有继承关系的事件类型,将该类的父类全部放入集合中
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//判断是否找到订阅者订阅了该事件
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
//发送没有订阅者订阅该事件
post(new NoSubscriberEvent(this, event));
}
}
}
代码也非常简单,首先看 eventInheritance 这个属性,是否开启事件继承,若是,找出发布事件的所有父类,也就是 lookupAllEventTypes(),然后遍历每个事件类型进行发布。若不是,则直接发布该事件。
如果需要发布的事件没有找到任何匹配的订阅信息,则发布一个 NoSubscriberEvent 事件。这里只看发布事件的关键代码 postSingleEventForEventType()。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
//获取到 Subscription 的集合
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//调用 postToSubscription 发送
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
来到这里,开始根据事件类型匹配出订阅信息,如果该事件有订阅信息,则执行 postToSubscription(),发布事件到每个订阅者,返回 true,若没有,则返回 false。继续追踪发布事件到具体订阅者的代码 postToSubscription()。
5.2、postToSubscription() 方法解析
postToSubscription()解析.pngprivate void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
//订阅线程跟随发布线程,EventBus 默认的订阅方式
case POSTING:
// 订阅线程和发布线程相同,直接订阅
invokeSubscriber(subscription, event);
break;
// 订阅线程为主线程
case MAIN:
//如果在 UI 线程,直接调用 invokeSubscriber
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
//如果不在 UI 线程,用 mainThreadPoster 进行调度,即上文讲述的 HandlerPoster 的 Handler 异步处理,将订阅线程切换到主线程订阅
mainThreadPoster.enqueue(subscription, event);
}
break;
// 订阅线程为主线程
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
// 订阅线程为后台线程
case BACKGROUND:
//如果在 UI 线程,则将 subscription 添加到后台线程的线程池
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
//不在UI线程,直接分发
invokeSubscriber(subscription, event);
}
break;
// 订阅线程为异步线程
case ASYNC:
// 使用线程池线程订阅
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
订阅者五种线程模式的特点对应的就是以上代码,简单来讲就是订阅者指定了在哪个线程订阅事件,无论发布者在哪个线程,它都会将事件发布到订阅者指定的线程。这里我们继续看实现订阅者的方法。
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
订阅者接收到了事件,调用订阅方法,传入发布的事件作为参数,至此,事件发布过程就结束了。
5.3、post 流程图
post流程图.png六、unregister 反注册
先看反注册的代码
EventBus.getDefault().unregister(this);
跟踪 unregister()
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
注册过程我们就知道 typesBySubscriber 是保存订阅者的所有订阅事件类型的一个 Map,这里根据订阅者拿到订阅事件类型 List,然后逐个取消订阅,最后 typesBySubscriber 移除该订阅者,。这里只需要关注它是如果取消订阅的,跟踪 unsubscribeByEventType()。
/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
subscriptionsByEventType 是存储事件类型对应订阅信息的 Map,代码逻辑非常清晰,找出某事件类型的订阅信息 List,遍历订阅信息,将要取消订阅的订阅者和订阅信息封装的订阅者比对,如果是同一个,则说明该订阅信息是将要失效的,于是将该订阅信息移除。
七、总结
回顾一下 EventBus 的三个步骤
- 注册订阅者
- 事件发布
- 反注册订阅者
好了,EventBus 的源码解析到这就结束了,想进一步了解 EventBus 的朋友可以亲自去阅读源码。
网友评论