1.前言
eventbus库小巧灵活好用,是面试的考点。学习完eventbus之后发现其和handler机制真的是太过于相似了。果然是一通百通,触类旁通啊。
2.正题
学习目标:
-
如何进行线程的切换
-
EventBus流程
Eventbus.java
构造方法:双重锁单例
/** Convenience singleton for apps using a process-wide EventBus instance. */
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}
注册过程
/**
* Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
* are no longer interested in receiving events.
* <p/>
* Subscribers have event handling methods that must be annotated by {@link Subscribe}.
* The {@link Subscribe} annotation also allows configuration like {@link
* ThreadMode} and priority.
*/
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
Log.d("wangxuyang","subscriberMethods size : "+subscriberMethods.size());
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
SubscriberMethod是对我们在Class中添加@Subscribe(threadMode = ThreadMode.MAIN)
的描述
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;
}
SubscriberMethindFinder.java 寻找Class中的注解方法。有俩种方式:
-
一种是通过反射查找
-
另外一种是通过注解生成的类
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//ignoreGeneratedIndex 默认为false
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);//通过反射查找
} else {
//通过接口 SubscriberInfoIndex 查找。SubscriberInfoIndex 由EventBus编译生成的代码
subscriberMethods = findUsingInfo(subscriberClass);//编译时生成代码
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
subscribe(subscriber, subscriberMethod) 方法的作用:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
Log.d("wangxuyang","class: "+subscriber.getClass()+" "+eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
Log.d("wangxuyang","size :"+size);
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
Subscription 对象作用:将subscriber 和SubscriberMethod 绑定到一起的封装。
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
将接受相同的Event的,Subscription 保存到一起,用CopyOnWriteArrayList 保存。
private final Map<Object, List<Class<?>>> typesBySubscriber;
一个订阅者,可能接受多个不同个Event。用 typesBySubscriber 保存
private final Map<Class<?>, Object> stickyEvents;
用来保存粘性事件的map
发送过程(post过程)
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
CurrentPostingThreadState 当前线程的副本:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
PostingThreadState: 当前线程下的副本变量。保存着发送的Event对象。
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
postSingleEvent方法发送事件:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
eventTypes中包含了当前的Class,以及父Class,以及父Class的父Class。
postSingleEventForEventType()方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
根据eventClass 获取到所有定于了此event的Subscriptions 。通过for循环,执行 postToSubscription()。
postToSubScription() 才是真正核心的方法:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);//假如是主线程执行进队列
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
这里有必要解释下:
-
POSTING
POSTING 的订阅者会在发布的同一个线程调用,发布者在主线程那么订阅者也就在主线程,反之亦,避免了线程切换
-
MAIN
表示事件处理函数的线程在主线程(UI)线程,因此在这里不能进行耗时操作
-
BACKGROUND
表示事件处理函数的线程在后台线程,因此不能进行UI操作。如果发布事件的线程是主线程(UI线程),那么事件处理函数将会开启一个后台线程,如果发布事件的线程是在后台线程,那么事件处理函数就使用该线程
-
ASYNC
表示无论事件发布的线程是哪一个,事件处理函数始终会新建一个子线程运行,同样不能进行UI操作。
MainThreadPoster,BackgroundPoster,AsyncPoster
- MainThreadPoster
本质是一个Handler,将主线程的Looper传入进来了。实现了Poster接口。实现代码如下:
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
maxMillisInsideHandleMessage:
它用于限制一次事件发布所能消耗的最多的主线程时间。如果事件限制到了的时候订阅者没有通知完,则会发送一个消息,在下一轮中继续处理。
接着我们观看HandlerPoster的enqueue操作:
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);//进完队列之后,发送一个Message,进入到handleMessage方法中,
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
主线程发送,主线程处理的流程总结:
主线程post一个event之后。MainThreadPoster会对此event执行进队操作,之后发送一个空Message,进入handleMessage中,执行while(true)循环, 队列弹出一个PendingPost。执行通过反射调用对应的方法。然后根据maxMillisInsideHandleMessage 结束掉while(true)循环 避免占用太多的主线程资源。
- AsyncPoster
AsyncPoster实现了Poster,Runnable接口。它的enqueue 方法就是将PendingPost入队,之后再run中出队,并反射调用。保证
eventBus.invokeSubscriber 在子线程中执行。
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
线程切换总结:
eventbus的线程切换,并不是像我们生活中切换的概念。假设有A,B赛道,A切换到B。那么A赛道依旧存在,我们还可以通过B切换到A。这是我们通俗的理解。
而EventBus的线程切换是。在主线程中执行invokeSubscriber方法,以及通过线程池启动一个线程执行invokeSubscriber方法。这就导致从Main切换到Thread1之后。很有可能下次再切换的时候,发现是Main切换到Thread2
post流程图:
未命名文件.jpg
网友评论