目的:分析post事件的发送与分发
从post开始:
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get(); //获取到当前线程中的PostingThreadState
List<Object> eventQueue = postingState.eventQueue; //获取该线程中事件的队列
eventQueue.add(event); //将新post的事件入队
if (!postingState.isPosting) { //如果没有正在发送
postingState.isMainThread = isMainThread(); //判断当前线程是否在主线程运行,如果在非安卓环境,总是返回true
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) { //循环遍历队列所有事件,然后发送
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
特别说明:
-
currentPostingThreadState.get();
返回的是当前线程中ThreadLocal中存储的PostingThreadState
对象,ThreadLocal是线程内部的存储类,线程私有的,每个线程对同一个key都可以有不同的一个副本。具体解析:ThreadLocal源码解析。 -
PostingThreadState
对象是一个静态类:存储了该线程中要发送的事件队列,发送事件是否为UI线程,该事件的订阅者等信息
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
- 将该事件加入该线程的事件队列,然后循环发送单个事件。
发送单个事件:
//发送某个事件【包含该事件的父类或接口事件】
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) { //是否也向订阅了该事件的父类的方法发送,默认为true
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); //查询到订阅了该事件类型的所有的父类或接口类型
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); //发送具体的事件类型
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) { //没有找到接受者
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) { //发送一个默认的没有找到监听者的事件
post(new NoSubscriberEvent(this, event));
}
}
}
方法说明:
- 获取到所有订阅了该事件类型的父类型,然后发送该类型及其所有父类型。
- 如果没有找到订阅者,则系统会默认发送给一个事件类型为
NoSubscriberEvent
的通知,外部可通过该类型进行事件发送失败的监听。
发送某一具体类型:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass); //获取到该事件的所有订阅者
}
if (subscriptions != null && !subscriptions.isEmpty()) { //如果订阅者不为空
for (Subscription subscription : subscriptions) { //循环遍历所有订阅者
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread); //将事件发送给订阅者
aborted = postingState.canceled;
} finally { //重置事件状态
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
也很简单,找到该订阅该事件的所有订阅者(如何通过事件类型获取所有订阅者可以看上篇文章),然后将事件发送给订阅者:
//将事件发送给具体的订阅者,入参isMainThread是指post的线程是否为主线程
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) { //首先判断一下订阅者所在的线程
case POSTING: //发送者与订阅者在统一线程,则直接反射调用
invokeSubscriber(subscription, event);
break;
case MAIN: //订阅者在主线程,
if (isMainThread) { //发送者也在主线程,则直接反射调用
invokeSubscriber(subscription, event);
} else { //发送者在非UI线程,则加入主线程发送器的队列等待发送
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED: //订阅者在主线程,且始终有序【不并发】
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND: //订阅者在后台线程,
if (isMainThread) { //发送者在UI线程,则添加到后台发送器队列等待发送
backgroundPoster.enqueue(subscription, event);
} else { //都在后台线程,直接反射调用
invokeSubscriber(subscription, event);
}
break;
case ASYNC: //在一个独立于发送线程和Ui线程的线程中发送
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
通过上边的几个方法,我们已经知道了post发送事件所在线程,要发送的对象,订阅者所在的线程,订阅者类,订阅者方法关键信息。因此,下边主要搞明白如何进行线程间通信的·。
首先要明白一个概念:发射器:Poster
interface Poster {
/**
* Enqueue an event to be posted for a particular subscription.
*
* @param subscription Subscription which will receive the event.
* @param event Event that will be posted to subscribers.
*/
void enqueue(Subscription subscription, Object event);
}
该接口包含一个方法:将订阅者对象以及要发送的信息添加到发送队列等候发送。
该接口共有3个实现类:
image.png
说明:
- HandlerPoster负责向主线程发送事件
- BackgroundPoster负责向后台非UI线程发送事件
- AsyncPoster也是向后台非UI线程发送事件,
BackgroundPoster
与AsnchPoster
之间的区别在于:
1. 如果发送者与接受者都在同一后台线程,BackgroundPoster
会直接在该线程反射调用。而AsycnPoster
则不管发送者与接收者是否在同一线程,始终会开启一个新的后台线程反射调用接受者函数。
2. 在短时间内并发发送大量事件时,BackgroundPoster
会保证接受者接收到的事件也是有序的,而由于AsycnPoster
每次实在单独线程回调,因此接受者接收的事件是不固定的。
下面先看一下BackgroundPoster
的实现:
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
该方法同时实现了两个接口:Runnable
与 Poster
。
外部通过调用enqueue
将事件与订阅者入队,然后会通过EventBus中的一个线程池来执行自身的run方法。在run方法中开启了一个死循环,会一直取出该队列中的PendingPost
对象,然后通过eventBus.invokeSubscriber(pendingPost);
来反射调用订阅者方法。
这里对PendingPost类说明一下:
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
- 该类中有一个静态列表作为对象池来存放自身
- 除了必要的发送事件对象以及订阅者对象,还有一个next指针指向下一个PendingPost
- 在外部通过静态方法
obtainPendingPost
时,会先从池中获取一个干净的对象然后赋值返回,否则会创建一个新的。 - 外部使用完后会通过
releasePendingPost
来释放抹去数据加入池中【如果池大小<10000】
上边的BackgroundPoster
类中持有一个PendingPostQueue
,该队列中声明了两个PendingPost对象作为head 和 tail。每次enqueue入队操作会将新入队的PendingPost
作为队尾。每次发送回通过poll来获取head队头,然后将头指针后移来达到队列的结构。
最后来看一下真正反射调用的方法:
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost); //在pendingPost使命完成后会release入池
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
void invokeSubscriber(Subscription subscription, Object event) {
try { //通过反射回调方法。
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
OK,想后台线程发送事件分析完毕,下面看一下如何向主线程发送事件:
public class HandlerPoster extends Handler implements Poster{
private final PendingPostQueue queue;
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
不难猜想,是通过new Handler(Looper.getMainLooper())
来获取主线程的handler,在入队操作中通过handler.sendMessage()
发送一个空消息,进入handleMessage
方法中处理,在handleMessage
方法中通过while(true)方法来不断获取队列中将要发送的事件,最后通过反射调用将事件发送到订阅者方法中。
总结
- 通过ThreadLocal来存放post到每个线程中的事件队列。
- 查找订阅了发送事件类型的父类型的方法,然后加入发送队列。
- 通过
HandlerPoster
,BackgroundPoster
和AsyncPoster
来实现不同线程之间的事件发送。 - 使用了对象池技术来实现
PendingPost
对象的获取与释放。 - 通过
PendingPostQueue
实现PendingPost
的队列操作。 - 通过反射调用订阅者方法将发送事件传递给订阅者。
完。
网友评论