EventBus 源码分析
分析源码之前
EventBus 大神的 github,最好的老师。
一、使用
我们在平时都会运用 EventBus 去简化各个组件之间的通信,相信使用了它之后感觉是真的方便了不少,可以有效的对解耦发送方和接收方。
无论主线程中调用子线程的方法,还是子线程中调用主线程的方法,我们不必去写繁杂的 Handler ,也省去了内存泄漏等的麻烦。
首先我们简单的运用一下 EventBus。
1、添加依赖
implementation 'org.greenrobot:eventbus:3.1.1'
2、定义事件
public class Student {
private String name;
private String addr;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
3、准备订阅者
声明并注释您的订阅方法,可选择指定一个线程模型。
@Subscribe(threadMode = ThreadMode.MAIN)
public void showSutednt(Student student) {
Toast.makeText(this, student.toString(), Toast.LENGTH_SHORT).show();
}
4、注册和注销订阅者
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
public void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
5、发布事件
Student student = new Student();
student.setAddr("北京");
student.setName("张三");
EventBus.getDefault().post(student);
二、使用须知
1、基于「发布订阅」
首先我们要明白它是基于「发布订阅」模式,通过 发布者 「Publisher」 发布事件给 「EventBus」 ,EventBus 最后把事件分发给订阅者 「Subscriber」 。<br />
2、四种线程模型
- POSTING (默认) :事件处理函数的线程和发布事件的线程在同一个线程。也就是直接操作。
- MAIN :事件处理函数的线程是主线程。注意不能有耗时操作。
- BACKGROUND:事件处理函数在后台线程(只有一个后台线程)。
- ASYNC:无论事件发布的函数在哪里,始终会新建一个新线程来运行。
分析源码
一、EventBus 的创建
EventBus#getDefault()
public static EventBus getDefault() {
if (defaultInstance == null) {
Class var0 = EventBus.class;
synchronized(EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
这是最常用最经典的 DoubleCheck 单例模式来创建 EventBus 对象。
EventBus#EventBus()
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus#EventBus(EventBusBuilder builder)
EventBus(EventBusBuilder builder) {
this.currentPostingThreadState = new ThreadLocal<EventBus.PostingThreadState>() {
protected EventBus.PostingThreadState initialValue() {
return new EventBus.PostingThreadState();
}
};
this.logger = builder.getLogger();
this.subscriptionsByEventType = new HashMap();
this.typesBySubscriber = new HashMap();
this.stickyEvents = new ConcurrentHashMap();
this.mainThreadSupport = builder.getMainThreadSupport();
this.mainThreadPoster = this.mainThreadSupport != null ? this.mainThreadSupport.createPoster(this) : null;
this.backgroundPoster = new BackgroundPoster(this);
this.asyncPoster = new AsyncPoster(this);
this.indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
this.subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex);
this.logSubscriberExceptions = builder.logSubscriberExceptions;
this.logNoSubscriberMessages = builder.logNoSubscriberMessages;
this.sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
this.sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
this.throwSubscriberException = builder.throwSubscriberException;
this.eventInheritance = builder.eventInheritance;
this.executorService = builder.executorService;
}
同样也是常用的 Builder 设计模式 来构造 EventBus 对象。
我们需要重点注意的几个参数
-
HashMap 「 subscriptionsByEventType」
-
HashMap 「typesBySubscriber」
-
ConcurrentHashMap 「stickyEvents」
-
SubscriberMethodFinder 「subscriberMethodFinder」
-
Poster 「mainThreadPoster」
-
BackgroundPoster 「 backgroundPoster」
-
AsyncPoster 「asyncPoster」
具体作用我们往下分析。
二、register
1、register
EventBusBuilder#register(Object subscriber)
public void register(Object subscriber) {
//首先获取 subscriber 的 Class 对象
Class<?> subscriberClass = subscriber.getClass();
//查找所有订阅者内部的事件方法
List<SubscriberMethod> subscriberMethods = this.subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized(this) {
Iterator var5 = subscriberMethods.iterator();
while(var5.hasNext()) {
SubscriberMethod subscriberMethod = (SubscriberMethod)var5.next();
// 调用 subscribe 分发订阅者的事件方法
this.subscribe(subscriber, subscriberMethod);
}
}
}
从这里可以看到我们在 Activity 里进行注册的时候,实际是把 Activity 作为订阅者去注册。<br />首先获取 subscriber 的 Class 对象,然后通过 findSubscriberMethods 查询在这个订阅者类里拥有 EventBus 注解的方法,然后添加到 List<SubscriberMethod> subscriberMethods。然后依次调用 subscribe。
2、findSubscriberMethods
SubscriberMethodFinder#findSubscriberMethods
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//首先看看 METHOD_CACHE 缓存中有没有,查找过的会保存在 METHOD_CACHE 缓存
//详解2.1 解释 SubscriberMethod
List<SubscriberMethod> subscriberMethods = (List)METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
} else {
//一般 ignoreGeneratedIndex 默认 false 看名字应该是通过反射来查找
if (this.ignoreGeneratedIndex) {
subscriberMethods = this.findUsingReflection(subscriberClass);
} else {
subscriberMethods = this.findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
}
详解 2.1
SubscriberMethod 是 一个包装类,把方法,线程模式,事件类,优先级,是否粘性,方法名称 包装起来。方便我们使用。
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
String methodString;
···省略
}
3、findUsingReflection
SubscriberMethodFinder#findUsingReflection
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// 详解 3.1
SubscriberMethodFinder.FindState findState = this.prepareFindState();
// 详解 3.2
findState.initForSubscriber(subscriberClass);
//findState.clazz 也就是 subscriberClass
while(findState.clazz != null) {
//详解 3.3 把查找的信息保存在 findState 中
this.findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return this.getMethodsAndRelease(findState);
}
详解 3.1
FindState 是查找结果类,包含 SubscriberMethod 的集合 subscriberMethods,也就是这个订阅者 class 里所有的订阅事件方法。还有 通过 事件类型 为 key 保存方法的 HashMap anyMethodByEventType,也就是把该事件类型的所有方法放到这个 HashMap 里。而 subscriberClassByMethodKey 是以方法名为 key,保存订阅者类的 HashMap。具体详细作用需要往下继续分析。
static class FindState {
final List<SubscriberMethod> subscriberMethods = new ArrayList();
final Map<Class, Object> anyMethodByEventType = new HashMap();
final Map<String, Class> subscriberClassByMethodKey = new HashMap();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
Class<?> subscriberClass;
Class<?> clazz;
boolean skipSuperClasses;
SubscriberInfo subscriberInfo;
··· 省略
}
FindState 的创建运用了数组大小为4的缓存池,当 FIND_STATE_POOL[i] 不为空的时候那么就使用这个对象,然后 FIND_STATE_POOL[i] 置为 null。
private SubscriberMethodFinder.FindState prepareFindState() {
SubscriberMethodFinder.FindState[] var1 = FIND_STATE_POOL;
synchronized(FIND_STATE_POOL) {
for(int i = 0; i < 4; ++i) {
SubscriberMethodFinder.FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
return new SubscriberMethodFinder.FindState();
}
}
详解 3.2
对 FindState 进行 subscriberClass 等赋值和初始化
void initForSubscriber(Class<?> subscriberClass) {
this.subscriberClass = this.clazz = subscriberClass;
this.skipSuperClasses = false;
this.subscriberInfo = null;
}
详解 3.3
通过反射获取 拥有 EventBus 注解的方法
private void findUsingReflectionInSingleClass(SubscriberMethodFinder.FindState findState) {
Method[] methods;
try {
//通过反射获取订阅者类里所有的方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable var12) {
//如果有权限问题 那么就获取 除了私有方法外的所有方法
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
Method[] var3 = methods;
int var4 = methods.length;
for(int var5 = 0; var5 < var4; ++var5) {
Method method = var3[var5];
//获取方法修饰符
int modifiers = method.getModifiers();
//修饰符是否是 public 是否可以被忽略
if ((modifiers & 1) != 0 && (modifiers & 5192) == 0) {
//获得方法的参数
Class<?>[] parameterTypes = method.getParameterTypes();
//我们只需要看 参数个数是 1
if (parameterTypes.length == 1) {
//获取注解 Subscribe
Subscribe subscribeAnnotation = (Subscribe)method.getAnnotation(Subscribe.class);
//如果有
if (subscribeAnnotation != null) {
//拿到 参数的类型 也就是 事件的类型
Class<?> eventType = parameterTypes[0];
//详解 3.4
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//把查找到的方法,事件类型,线程类型,优先级,是否粘性 放入包装类 SubscriberMethod 中。
//最后添加到 subscriberMethods
//循环往复 最后把所有的方法以及它的包装类的所有信息都放入了查找结果类 findState 。
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (this.strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (this.strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
详解 3.4
判断 是否可以把 这个方法 放到订阅方法集合。
boolean checkAdd(Method method, Class<?> eventType) {
//往 anyMethodByEventType 添加 key 类型 eventType 的,value 是 method。
//把事件的类型和方法放到这个HashMap 中。 其实就是保证 同一个 eventType 对应一个方法
//如果 key 没有重复则 返回 null ,如果已经有值了返回 之前的值
Object existing = this.anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
//如果子类和父类都去订阅该事件 那么 existing 不等于 null
if (existing instanceof Method) {
if (!this.checkAddWithMethodSignature((Method)existing, eventType)) {
throw new IllegalStateException();
}
this.anyMethodByEventType.put(eventType, this);
}
//那么我们怎么去添加 重复的呢?就需要根据方法的签名去了 详情在下面
return this.checkAddWithMethodSignature(method, eventType);
}
}
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
this.methodKeyBuilder.setLength(0);
this.methodKeyBuilder.append(method.getName());
this.methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = this.methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
//subscriberClassByMethodKey HashMap 是以 方法 为 key ,methodClass 为 value。
//一个方法对应一个 methodClass
Class<?> methodClassOld = (Class)this.subscriberClassByMethodKey.put(methodKey, methodClass);
//class1.isAssignableFrom(class2) 判定此 Class 对象所表示的类或接口与指定的 Class
//参数所表示的类或接口是否相同,或是否是其超类或超接口。
//如果在同一个类 不同方法名 那么返回 methodClassOld== null 直接返回 true
//如果是 同一个方法名,那么就看看这个方法所在的类是否有有亲属关系了,如果没有 那么就返回 false,不会添加
//如果是 同一个方法名 但是是父类或者接口,那么返回 true,最后替换 anyMethodByEventType 的方法值
if (methodClassOld != null && !methodClassOld.isAssignableFrom(methodClass)) {
this.subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
} else {
return true;
}
}
一个 eventType 对应一个方法,一个方法对应一个 methodClass 。<br />我们具体怎么理解 anyMethodByEventType 和 subscriberClassByMethodKey 呢?<br />我们如何确定同一个 eventType 对应一个方法?anyMethodByEventType 负责存放这个键值对,如果有两个方法参数都是同一个 eventType,那么就需要用 subscriberClassByMethodKey 去保证了。看一看方法名是否相同,不相同那么就替换,如果相同那么就看是否是父类的这个方法也订阅了,如果是那么也是替换,如果不是就不会替换。
到此为止我们已经找到所有 EventBus 标注的方法了。接下来应该 subscribe(subscriber, subscriberMethod) 方法了。看看这些方法是如何被注册的。
4、subscribe
EventBus#subscribe
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//获取 订阅方法的 eventType 也就是事件 class
Class<?> eventType = subscriberMethod.eventType;
//创建 订阅 封装类
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
//这个 Map 通过 eventType 获取 所有的订阅集合
//Subscription 详解 4.1
CopyOnWriteArrayList<Subscription> subscriptions = (CopyOnWriteArrayList)this.subscriptionsByEventType.get(eventType);
//如果 subscriptions 为空 那么新建 并添加 到 subscriptionsByEventType
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList();
this.subscriptionsByEventType.put(eventType, subscriptions);
//如果 里面有这个 订阅封装类 那么报错 这个类已经有这个 事件的订阅了,不能重复订阅
} else if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
int size = subscriptions.size();
//按照 priority 优先级来插入集合中
for(int i = 0; i <= size; ++i) {
if (i == size || subscriberMethod.priority > ((Subscription)subscriptions.get(i)).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 同一订阅者中 typeClass 的集合 用来判断 这个方法是否是粘性的
List<Class<?>> subscribedEvents = (List)this.typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList();
this.typesBySubscriber.put(subscriber, subscribedEvents);
}
((List)subscribedEvents).add(eventType);
//如果接收sticky事件,立即分发sticky事件
if (subscriberMethod.sticky) {
//默认情况下 event 事件允许继承,即默认情况下eventInheritance==true
if (this.eventInheritance) {
Set<Entry<Class<?>, Object>> entries = this.stickyEvents.entrySet();
Iterator var9 = entries.iterator();
while(var9.hasNext()) {
Entry<Class<?>, Object> entry = (Entry)var9.next();
Class<?> candidateEventType = (Class)entry.getKey();
//这里就是看 eventType 是否是 candidateEventType 类 或者 candidateEventType 的父类 、接口
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
this.checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
//如果不允许继承 那么直接就是这个 eventType class
Object stickyEvent = this.stickyEvents.get(eventType);
//这个最后走的是 postToSubscription 方法,我们下面会具体分析
this.checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
详解 4.1
Subscription 封装了 subscriber 订阅者 和 subscriberMethod 订阅方法 ,以及这个订阅的是否 acitive 。
final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
volatile boolean active;
···
}
我们来看一下 subscriptionsByEventType 和 typesBySubscriber 又是两个 Map。
subscriptionsByEventType 是根据 tpyeClass 的查找 subscriptions,也就是根据事件类型来找 订阅集合的。这个集合有着优先级,同一事件,不同优先级的 subscription。
typesBySubscriber 是跟剧 订阅者 查找 tpyeClass,也就是根据订阅者查找其中的事件,如果有粘性的就立马分发。
到此 register 就分析完毕,说实话这样分析下来,脑袋里实在还是不懂这个流程,那我们就换一下流程图,简化一下这个流程。
image.png三、post
我们如何把订阅者的方法和订阅事件相关联,也就是事件是如何分发到事件方法的。
首先我们需要了解 「currentPostingThreadState」 在不同线程中保存 「PostingThreadState」。
private final ThreadLocal<EventBus.PostingThreadState> currentPostingThreadState;
「PostingThreadState」 是分发线程状态,包换 事件队列 、是否正在分发、是否在主线程、是否取消了 还有订阅分装类 Subscription ,以及当前事件。记录着本线程中事件分发的状态。
static final class PostingThreadState {
final List<Object> eventQueue = new ArrayList();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
PostingThreadState() {
}
}
public void post(Object event) {
EventBus.PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
//把当前 event 添加到 eventQueue
eventQueue.add(event);
//如果时间没有正在分发
if (!postingState.isPosting) {
//对 postingState 进行赋值
postingState.isMainThread = this.isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//依次 取出 event 并执行 postSingleEvent 方法。
while(!eventQueue.isEmpty()) {
this.postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//最后对 postingState 做 默认值 处理
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
通过方法命名我们知道这是处理单个事件的方法。
1、postSingleEvent
EventBus#postSingleEvent
private void postSingleEvent(Object event, EventBus.PostingThreadState postingState) throws Error {
//首先获取这个类的 class 对象
Class<?> eventClass = event.getClass();
//是否查找到了 订阅方法
boolean subscriptionFound = false;
//如果允许继承
if (this.eventInheritance) {
//查找该类 以及它的父类 的所有 事件类型
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//如果发现有匹配的事件类型??
for(int h = 0; h < countTypes; ++h) {
Class<?> clazz = (Class)eventTypes.get(h);
subscriptionFound |= this.postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = this.postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (this.logNoSubscriberMessages) {
this.logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (this.sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
this.post(new NoSubscriberEvent(this, event));
}
}
}
2、postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, EventBus.PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList subscriptions;
synchronized(this) {
//获得该 eventClass 的所有 订阅方法
subscriptions = (CopyOnWriteArrayList)this.subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
Iterator var5 = subscriptions.iterator();
while(var5.hasNext()) {
//从 方法集合中取出一个 方法
Subscription subscription = (Subscription)var5.next();
//给 postingState 赋值
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//这回终于要 分发成功了
this.postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
} else {
return false;
}
}
3、postToSubscription
终于有看到这个方法,在 subscrible 方法的粘性事件处理上 也是调用的这个方法,这也是我们真正分发的方法。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//首先判断 是哪种 threadMode 模式 运行
switch(subscription.subscriberMethod.threadMode) {
case POSTING:
//详解 3.1
this.invokeSubscriber(subscription, event);
break;
case MAIN:
//详解 3.2
if (isMainThread) {
this.invokeSubscriber(subscription, event);
} else {
this.mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (this.mainThreadPoster != null) {
this.mainThreadPoster.enqueue(subscription, event);
} else {
this.invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
//详解 3.3
if (isMainThread) {
this.backgroundPoster.enqueue(subscription, event);
} else {
this.invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//详解 3.4
this.asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
详解 3.1
EventBus#invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException var4) {
this.handleSubscriberException(subscription, event, var4.getCause());
} catch (IllegalAccessException var5) {
throw new IllegalStateException("Unexpected exception", var5);
}
}
最后我们发现 invokeSubscriber 方法实际是通过 subscription 的持有的 method 的引用通过反射的方法,把 订阅者 subscriber 和 事件 event 填入,真正调用的就是 subscriber.xxx(event)。也就是 activity.xxx(event)。
饶了一大圈,其实也就是把 订阅者 和 事件的 引用保存起来,查找方法,通过反射进行调用。真是原理很简单,实现很蛋疼,不把你绕晕是不行的。
详解 3.2
如果 是在主线程中,isMainThread == true,直接 invokeSubscriber 。<br />如果 false ,那么就需要 倒一手了。其实 HandlerPoster 就是 一个 Handler , 并且它的 Lopper 是主线程的Looper 。这样就是通过 Handler ,添加到主线程了,然后执行 invokeSubscriber 。这么看切换线程原理还是 Handler。
HandlerPoster#enqueue
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized(this) {
this.queue.enqueue(pendingPost);
if (!this.handlerActive) {
this.handlerActive = true;
if (!this.sendMessage(this.obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
详解 3.3
BackgroundPoster 和 主线程的 HandlerPoster 不通,它是一个 Runnable 线程。和主线程 HandlerPoster 不同的是它会把 queue 的所有 pendingPost 都去执行。
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
this.queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized(this) {
this.queue.enqueue(pendingPost);
if (!this.executorRunning) {
this.executorRunning = true;
this.eventBus.getExecutorService().execute(this);
}
}
}
public void run() {
try {
while(true) {
PendingPost pendingPost = this.queue.poll(1000);
if (pendingPost == null) {
synchronized(this) {
pendingPost = this.queue.poll();
if (pendingPost == null) {
this.executorRunning = false;
return;
}
}
}
this.eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException var9) {
this.eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", var9);
} finally {
this.executorRunning = false;
}
}
}
详解 3.4
AsyncPoster 是一个 Runnalbe。是通过线程池产生新的线程,最后执行 invokeSubscriber 方法。和 BackgroundPoster 不同的是 一次只取一个 PendingPost。
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
this.queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
this.queue.enqueue(pendingPost);
this.eventBus.getExecutorService().execute(this);
}
public void run() {
PendingPost pendingPost = this.queue.poll();
if (pendingPost == null) {
throw new IllegalStateException("No pending post available");
} else {
this.eventBus.invokeSubscriber(pendingPost);
}
}
}
//最后还是走的 invokeSubscriber 方法
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
this.invokeSubscriber(subscription, event);
}
}
最后也要来一个流程图来总结下 post 的流程。
image.png四、 unregister
解绑订阅者和订阅事件。
public synchronized void unregister(Object subscriber) {
//获取 该 订阅者 的所有 订阅事件
List<Class<?>> subscribedTypes = (List)this.typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
Iterator var3 = subscribedTypes.iterator();
while(var3.hasNext()) {
Class<?> eventType = (Class)var3.next();
//挨个 解绑 订阅者 和 订阅事件的关系
this.unsubscribeByEventType(subscriber, eventType);
}
//最后把这个 订阅者 移出 该 map
this.typesBySubscriber.remove(subscriber);
} else {
this.logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//获取 这个订阅事件的 所有订阅者
List<Subscription> subscriptions = (List)this.subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for(int i = 0; i < size; ++i) {
Subscription subscription = (Subscription)subscriptions.get(i);
//判断 是否是这个订阅者 subscription 置 flase ,然后 移除 该集合
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
--i;
--size;
}
}
}
}
总结
一、regist
1、遍历所有 EventBus 的订阅事件。<br />2、把 封装订阅者加入 以事件类型为 key ,所有封装订阅者的集合为 values 的 Map 中。详细见注释。<br />3、把订阅事件 添加到 以 「subscriber」订阅者为 key,「subscribedEvents」订阅事件集合为 value 的 Map 中。<br />4、如果订阅了粘性事件的订阅者,那么会 粘性事件集合中获取之前的粘性事件,然后相应这些粘性事件。
注释:<br />把 「subscriber」和 「subscriberMethod」封装成 「Subscription」 封装订阅者。通过 订阅事件的 「SubscriberMethod」 获取 「eventType」,以 eventType 事件类型为 key ,封装订阅者集合(subscriptions)为 value 的 Map 「subscriptionsByEventType」。并把这个 Subscription 添加到 subscriptions 中。
二、post
1、获取该线程下的事件队列。<br />2、把要发送的事件添加到队列中。<br />3、根据订阅事件 查找所有 封装订阅者。<br />4、根据订阅方法的执行模式,在对应的线程中通过反射执行订阅者的订阅方法。
三、unregist
1、首先 获取订阅者的所有订阅事件。<br />2、遍历 订阅事件 <br />3、根据 订阅事件获取 订阅者的集合。<br />4、判断 该订阅者 是否在 封装订阅者集合 「subscriptions」中,把这个订阅者从 subscriptions 中移除。<br />5、最后 把订阅者 从「typesBySubscriber」中移除。
最后
「云开方见日,潮尽炉峰出。」——戴叔伦
网友评论