前言
接上篇EventBus源码解析(一)EventBus事件订阅,一直想着怎么写出通俗易通的文章,无奈水平有限,想配图却不知从哪开始。如果文章哪里写的不好或错了,希望大神能指出并纠正。
正题
EventBus消息发送分为两种,一种是普通发送,一种粘性发送。本篇文章讲普通发送。
EventBus普通消息发送
使用post就能完成一次普通的消息发送,参数为model类的实例,发送之后订阅者就会收到小心并处理事件。
下面看一下post的源码:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
//消息队列
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
//判断消息是否已发送
if (!postingState.isPosting) {
//是否为主线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
//标记为已发送
postingState.isPosting = true;
//
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//发送消息并移除已发送的消息
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
在post方法中,使用的是ThreadLocal解决多线程并发问题。至于它和synchronized锁的区别,感兴趣的同学可以去了解一下。这里给个地址彻底理解ThreadLocal。之后再把消息加入到队列当中,经过判断消息的各种状态之后发送消息。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//eventInheritance默认为true 在EventBusBuilder类中可设置
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
//从缓存中查找model类集合
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
//将model类的父类和接口添加至model类集合
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
//添加至缓存
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
/** Recurses through super interfaces. */
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
这段代码检查model类及其父类和接口,在添加接口和父类的while循环中,如果找不到父类后就会返回null则跳出循环。每循环一次clazz都是不同的类,添加接口时使用递归,设计的非常巧妙。之后的工作就是把小心发送出去了
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//从subscriptionsByEventType中取出订阅请求
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
//根据订阅请求中的线程信息发送消息
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
发送请求的代码非常简单,在 subscribe(subscriber, subscriberMethod)(见第一篇)中subscriptionsByEventType已经添加了订阅信息,eventClass和eventType都是model类,所以取出的就是订阅的消息请求集合了。之后再遍历订阅集合,根据订阅信息中的线程参数去发送信息。然后通过反射完成消息的发送。
网友评论