前面三章分别分析了 EventBus
的创建,注册过程, 这个章节将会接着分析 EventBus
事件的发送.
EventBus
的事件发送分为两种, 一种为发送普通事件, 一种为粘性事件
EventBus.getDefault().post(Object event)
EventBus.getDefault().postSticky(Object event)
它们的区别在于, 发送粘性事件的时候, 会先将事件添加至一个 EventBus.stickyEvents
这个 Map
中, 再去调用 post(Object event)
方法.
EventBus.stickyEvents
在第一章 这可能是最详细的 EventBus 源码分析01 - EventBus 对象的创建 中有介绍.
EventBus
注册的时候, 如果当前订阅者内有粘性事件的订阅方法则就会直接触发发送事件. 在第三章 这可能是最详细的 EventBus 源码分析03 - EventBus 的注册(下篇) 分析 5 中有介绍. 如果要订阅的这个事件是粘性事件, 就从EventBus.stickyEvents
这个map
中查找是否有这个粘性事件, 有就直接执行发送事件.所以订阅粘性事件后立即执行的前提是, 有通过
postSticky(Object event)
这个方法发送过粘性事件, 才会将这个粘性事件存到EventBus.stickyEvents
这个map
中. 否则就算订阅了粘性事件, 也不会立即执行发送.
事件发送的流程大致如下:
|--- EventBus.post(event)
|------------ EventBus.postSingleEvent(event,postingState)
|------------------ EventBus.postSingleEventForEventType(event,postingState,eventClass)
|------------------------ EventBus.postToSubscription(subscription,event,isMainThread)
|------------------------------ HandlerPoster.enqueue(subscription, event)
|------------------------------ BackgroundPoster.enqueue(subscription, event)
|------------------------------ AsyncPoster.enqueue(subscription, event)
|------------------------------ EventBus.invokeSubscriber(subscription, event)
现在正式开始分析事件的发送流程.
1. EventBus.post(event)
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
//获得事件队列
List<Object> eventQueue = postingState.eventQueue;
//将要发送的事件添加到当前线程的事件队列中
eventQueue.add(event);
//判断是否正在发送
if (!postingState.isPosting) {
//判断赋值是否是主线程
postingState.isMainThread = isMainThread();
//设置标志位,防止重复进入
//假如发布者正在发布post,此时发布者又post了一个事件, 虽然这时候不会通过 if (!postingState.isPosting) 的判断,
//但是 eventQueue.add(event) 已经把此事件加入列表,在下边的循环中还是可以处理此event*
postingState.isPosting = true;
//如果这时候发布者取消发送,就抛出异常
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//循环事件队列,将发布者发送的所有事件都发布出去
while (!eventQueue.isEmpty()) {
//发送事件队列中的第一个事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//重置标志位
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
在post
方法中首先通过 currentPostingThreadState.get()
获得了当前线程的一个 PostingThreadState
对象.
currentPostingThreadState
是一个ThreadLocal
, 在 Android之消息机制学习中有分析过, 不了解的可以先去看一下.
而
PostingThreadState
则是一个发送事件线程状态和一些信息的封装类
//发送事件的线程状态的封装类.
final static class PostingThreadState {
//事件队列集合
final List<Object> eventQueue = new ArrayList<>();
//是否正在发送事件标志位
boolean isPosting;
//是否是主线程的标志位
boolean isMainThread;
//订阅记录, 包含了订阅者与订阅方法的相关内容
Subscription subscription;
//事件
Object event;
//取消的标志位
boolean canceled;
}
也就是说,对于每一个发送事件的线程其都有一个唯一的 PostingThreadState
来记录事件发送的队列以及状态。
接着就是对发送线程状态的一些赋值及判断操作. 并调用了 EventBus.postSingleEvent(event,postingState)
传入事件及发送线程状态
2. EventBus.postSingleEvent(event,postingState)
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 得到事件的 Class 对象
Class<?> eventClass = event.getClass();
// 是否找到订阅者, 默认为 false
boolean subscriptionFound = false;
// 如果支持事件继承,默认为 true 支持.
if (eventInheritance) {
// 查找当前事件继承的所有父类和所有实现的接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//将当前事件的所有父类和所有实现接口也作为事件发送出去.
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//发送事件
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//没有找到订阅者
if (!subscriptionFound) {
//打印异常日志, 默认为 true.在第一章 EventBus 对象的创建中默认为 true.
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
//发送一个 NoSubscriberEvent 的订阅事件, 默认接收此事件, 在第一章 EventBus 对象的创建中默认为 true.
post(new NoSubscriberEvent(this, event));
}
}
}
这个方法中, 根据 eventInheritance
属性, 决定是否向上遍历事件的父类型, 然后调用 postSingleEventForEventType(event, postingState, eventClass)
方法进一步处理发送事件:
3. EventBus.postSingleEventForEventType(event, postingState, eventClass)
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
-
先从
subscriptionsByEventType
这个map
中获取所有订阅了此事件的订阅记录列表. (在register
中的第二步中已经将与事件相对定的订阅记录存起来了. 详情见这可能是最详细的 EventBus 源码分析03 - EventBus 的注册(下篇). -
接着判断这个订阅记录列表, 如果是空的, 表示没有没有这个事件的订阅记录. 直接返回
false
. 然后就返回到上一步发送一个NoSubscriberEvent
并记录日志. -
如果不为空, 表示有订阅记录. 就开始遍历订阅记录列表.
-
将事件与订阅记录赋值给
postingState
. -
调用
postToSubscription()
方法继续进一步发送事件.aborted
默认都为false
-
无论是否有异常都重置标志位.
-
遍历完成返回
true
.
4. postToSubscription(subscription, event, isMainThread)
在分析这个方法之前, 需要先了解一下 EventBus
的几种线程模型, 就是我们在订阅方法上面加的注解.
public enum ThreadMode {
//默认的线程模式,在那个线程发送事件就在对应线程处理事件,避免了线程切换,效率高。
POSTING,
//在主线程处理事件.
MAIN,
//无论在那个线程发送事件, 都先将事件入队,然后切换到主线程后依次处理事件. 先进先出.
MAIN_ORDERED,
//后台线程处理, 所有该线程模式下的事件会在线程池中用一个线程排队串行处理.
// (直到队列里边的事件处理完之后又有新的事件发布出来才会向线程池获取一个新的线程)
BACKGROUND,
//异步处理. 无论在那个线程发送事件,都将事件入队列,然后通过线程池处理, 每个事件会是单独的一个线程来处理.
//和 BackgroundThread 模式不同的是, 该线程模式的每个事件都是在线程池中开辟一个线程处理, 事件之间并发处理, 不需要排队处理.
ASYNC
}
例如: @Subscribe(threadMode = ThreadMode.MAIN)
, 如果不加这个注解也不会报错, 因为 EventBus
为我们提供了默认的线程模型. 下面接着看 postToSubscription()
方法的代码
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据不同的线程模式执行对应的操作
switch (subscription.subscriberMethod.threadMode) {
//如果订阅的时候使用的是默认的线程模型, 在哪个线程发送事件就在哪个线程处理事件
case POSTING:
//在发送者的线程里边通过反射执行事件处理方法
invokeSubscriber(subscription, event);
break;
//在订阅的时候指定在主线程执行
case MAIN:
//如果发布者现在是在主线程, 那么直接在主线程通过反射来处理事件.
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
//如果发布者不是在主线程, 就把当前事件加入到主线程的消息循环队列.
//在队列中通过 Handler 切换到主线程来处理事件. 最终也是调用的 invokeSubscriber 方法
mainThreadPoster.enqueue(subscription, event);
}
break;
//在订阅的时候指定主线程执行,但是要先入队
case MAIN_ORDERED:
// mainThreadPoster基本不会为 null. 加入到主线程的消息循环队列
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
//订阅的时候要求在后台执行
case BACKGROUND:
//如果发布者是在主线程, 就把当前事件加入到后台线程消息循环队列.
//如果后台线程消息队列中有多个事件, 那么将会依次执行.
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
//发布者不是在主线程, 那么就直接在发布者所在的线程反射处理事件.
invokeSubscriber(subscription, event);
}
break;
//在订阅的时候指定是异步执行
case ASYNC:
//将当前事件与订阅记录加入到异步消息队列后通过线程池为每个事件单独开辟一个线程进行执行.
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
通过代码不难看出. 这个方法还不是最终处理事件的方法, 只是对我们在订阅事件方法添加线程模型的注释的处理. 除开线程模型, 主要的方法有 4 个. 分别如下
invokeSubscriber(subscription, event)
mainThreadPoster.enqueue(subscription, event)
backgroundPoster.enqueue(subscription, event)
asyncPoster.enqueue(subscription, event);
其实 2,3,4 最终都调用了 1 这个方法. 确切的说 invokeSubscriber(subscription, event)
才是最终进行事件处理的方法.
mainThreadPoster, backgroundPoster, asyncPoster
这三个对象的创建在第一章这可能是最详细的 EventBus 源码分析01 - EventBus 对象的创建 中已经分析过, 这里不再进行说明.
//mainThreadPoster 继承自 Handler 类 并实现了 Poster 接口, 实际的实现类是 HandlerPoster
private final Poster mainThreadPoster;
//backgroundPoster 和 asyncPoster 都实现了 Runnable, Poster 这两个接口
private final BackgroundPoster backgroundPoster; //实现类是 BackgroundPoster
private final AsyncPoster asyncPoster; //实现类是 AsyncPoster
在分析这三种模式的入队操作之前, 我们需要先了解一个各种线程模型都会使用到的数据结构PendingPostQueue
, 它就是一个队列, 针对待处理的事件进行了排队. 现在看一下它的内部具体实现
4.1 PendingPostQueue 待处理事件队列
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
//出队
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}
队列中的数据类型为 PendingPost
, 这个等下会分析,
- 入队流程, 尾部入队
入队时先进行判
Null
, 以免Null
进入队列.若是队列首次插入元素 A,此时的
head
和tail
都应该为Null
,将需要插入元素 A 并且同时赋给head, tail
那么此时head, tail
的内存指针都是指向 A .若我们再次执行插入元素 B,此时的
head, tail
都是有值的,那么我们将执行tail.next = B
相当于将元素 B 赋予了 A 的A.next
属性值,那么此时head.next
的值也就等于 B. 紧接着我们重新为tail
赋新值 B,那么此时A.next
和head.next
就等于tail
.我们就如此一直插入数据,
tail
保存的永远是最后一个值.
- 出队流程, 头部出队
第一次执行
poll
函数, 获取头部的pendingPost
元素, 然后进行判断head
值是否为空, 若不为Null
, 获取head.next
值作为下一次的执行poll
函数的head
值, 一次类推直到head.next
为Null
时, 说明没有了下一元素, 也就说到队尾 了, 将tail
置Null
,结束.
Ps: 详细的可以看一下另外一个简友的文章, 针对
PendingPostQueue
写的也较为详细, 入队流程及出队流程都是从他那里直接复制来的.
EventBus系列『番外』——认真剖析 『PendingPostQueue』队列的实现思想
入队方法执行完成后调用了 notifyAll()
方法释放了对象锁.
如果调用的出队方法是 poll(int maxMillisToWait)
则会等待 maxMillisToWait
时长的锁, 如果在这个时间内还没有入队的操作, 那么就是读一个空队列了, 直接调用 poll()
返回 null
.
接下来看一下 PendingPost
的内部结构
4.2 PendingPost 用来包装将要发送事件.
final class PendingPost {
//缓存任务的集合
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
//要发送的事件
Object event;
//订阅记录
Subscription subscription;
//下一条
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
//从缓存池中取出一个 PendingPost 任务.从集合的尾部获取
//缓存池为空就创建一个 PendingPost
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
//回收重置 PendingPost.将使用过的 PendingPost 放入缓存池中, 下次就直接从缓存集合中取, 不需要再创建.
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
在 4.1 中我们知道 PendingPostQueue
维护了一个 PendingPost
类型的数据队列. 看上面的代码, 可以看得出 PendingPost
是一个对象池的模式. 包含了从对象池获取对象的方法 obtainPendingPost()
及结束后向对象池归还对象的方法 releasePendingPost()
.
构造方法为私有, 目的就是为了让我们只能从对象池来来获取
PendingPost
对象.
好了, PendingPostQueue
与 PendingPost
了解完成后, 就可以接着从 4. 从乡下分析了. 首先来分析的是 mainThreadPoster.enqueue(subscription, event)
5. mainThreadPoster.enqueue(subscription, event)
public class HandlerPoster extends Handler implements Poster {
...
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg){
...
}
}
HandlerPoster
初始化及内部变量在第一章已经分析过, 忘记的可以回去看一下.
在方法开始就是先从 PendingPost
的对象池中获取一个 pendingPost
对象, 这个对象包含了我们的订阅记录与事件属性. 接着调用 PendingPostQueue.enqueue()
方法将 pendingPost
入队.
如果主线程没有任务正在处理, 就发送一条空的消息.(HandlerPoster
继承自 Handler
). 那么为什么这里要发送一条空的消息呢?
看过 Android 之 消息机制学习 这个系列的应该知道
Handler.sendMessage()
方法最终调用的是MessageQueue.enqueueMessage()
这个方法. 在MessageQueue.enqueueMessage()
方法中会对这个空消息进行执行时间与插入到消息链表的位置的判断. 根据判断结果决定是否唤醒MessageQueue.next()
方法中的阻塞.最后将这条空消息返回到Looper.loop()
方法中. 调用msg.target.dispatchMessage(msg)
将这个空消息进行分发, 而Handler.dispatchMessage()
中又调用了Handler.handleMessage()
方法. 最终就回到了HandlerPoster.handleMessage()
方法.
总结就是: 发送空消息的目的, 就是为了切换到主线程并且回调重写的 handleMessage()
方法.
是不是有的朋友又要问, 在哪里切换到主线程的呀. 代码中没看到呀. 请翻到第一章找到 mainThreadSupport
对象的创建. 再不明白的再结合 Android 消息机制之初识Handler [ 六 ] 看一下.
OK. 言归正传. 看看在 handleMessage()
中都做了什么
5.1 mainThreadPoster.handleMessage(Message msg)
public class HandlerPoster extends Handler implements Poster {
...
public void enqueue(Subscription subscription, Object event) {
...
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
//获得开机到现在的时间
long started = SystemClock.uptimeMillis();
//开启死循环
while (true) {
//调用出队方法获得一个 pendingPost 对象
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
//如果第一次取出的 pendingPost 为空,在同步锁内就再取一次
synchronized (this) {
pendingPost = queue.poll();
//如果再次取出的 pendingPost 还是空, 说明队列中没有消息, 那就退出死循环, 等待下次被唤醒.
//将 handlerActive 这个标记为置为 false.
//handlerActive 这个标记为在 mainThreadPoster.enqueue 中发送空消息前也有判断,
//只有当前 Handler 不是活动的状态才会发送空消息.
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//最终也是调用 eventBus.invokeSubscriber(pendingPost), 最后统一分析
eventBus.invokeSubscriber(pendingPost);
//判断任务是否执行超时, 超时则重新发送一条空消息触发 handleMessage 方法.
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
根据这个方法内部的代码不难看出参数 msg
也就是我们发送的那个空消息在这里就没有使用到. 这又进一步的说明了, 在上一步发送的那个空消息, 仅仅只是为了切换到主线程并且激活 handleMessage()
方法.
6. BackgroundPoster.enqueue(subscription, event)
final class BackgroundPoster implements Runnable, Poster {
...
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
通过 enqueue()
方法可以看到如果 BackgroundPoster
这个 Runnable
正在被线程池执行, 这时候 executorRunning==true
, 那么在 executorRunning==true
的情况下发布的事件只会入队, 而不会再次去调用线程池的 execute()
方法.
这样的话在 run()
方法中就通过死循环遍历处理队列中的事件. 全部处理完成后就退出死循环. 然后设置 executorRunning
为 false
, 表示没有正在运行中的线程了. 此后再发布事件才会在线程池中开辟一个新线程.
也就符合了我们上面说的 .某一时段内
BackgroundThread
模式的事件都会在BackgroundPoster
的run
方法中排队处理, 也就是说该时段内的所有事件是在一个线程中排队后串行执行的(队列中的事件处理完之后又有新的事件发布才会新开线程)。
7. AsyncPoster.enqueue(subscription, event)
class AsyncPoster implements Runnable, Poster {
...
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
上面说过, 异步模式会将订阅记录加入到异步消息队列后会通过线程池为每个事件单独开辟一个线程进行执行. 为什么这么说呢?
细看它的 enqueue()
方法. 它会为每一个添加的任务都在线程池中开辟一个新的线程执行. 并发度更高.
8. EventBus.invokeSubscriber(PendingPost pendingPost)
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
终于到了最终的执行的方法了. 参数为 PendingPost pendingPost
的看的出来是从三个 Poster
发来的. 在这个方法内, 调用了 PendingPost.releasePendingPost
方法来回收重置 PendingPost
接着调用 2 个参数的 invokeSubscriber(subscription, event)
.
invokeSubscriber(subscription, event)
就是真正执行反射调用执行的方法.
OK. 分析到这一步, EventBus
的对象的创建, 注册, 事件的发送执行 已经全部分析完成了. 好像还剩下一个取消注册的. 有兴趣的朋友跟进去看一下吧. 如果是从第一篇跟着看到这里的朋友, 看一个取消注册的代码, 肯定是非常 Easy
的. 后面会增加一章 EventBus3.x
新特性相关的.
网友评论