美文网首页
EventBus源码解析

EventBus源码解析

作者: 慕北人 | 来源:发表于2020-03-24 20:21 被阅读0次

    EventBus源码阅读

    一、几个简单类

    1. Subscribe注解

    对于该注解没啥好说的:

    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD})
    public @interface Subscribe {
        ThreadMode threadMode() default ThreadMode.POSTING;
        boolean sticky() default false;
        int priority() default 0;
    }  
    

    三个属性的作用:

    • threadMode:处理事件的线程
    • sticky:表示是否处理粘性事件
    • priority:级别,对于同一个threadMode下的subscribers级别越高的订阅者越早收到事件

    2. ThreadMode

    这是一个枚举类:

     public enum ThreadMode {
        POSTING,
        MAIN,
        MAIN_ORDERED,
        BACKGROUND,
        ASYNC
    }  
    
    • POSTING:默认的mode,该mode代表订阅者方法的调用线程与Event的post线程为同一个线程。该mode下没有线程切换发生,所以在一些简单的、快速返回的任务推荐使用该mode
    • MAIN:该mode表示订阅者方法调用在主线程,该mode下订阅者方法应该快速返回才行,以免阻塞主线程
    • MAIN_ORDERED
    • BACKGROUND:如果posting线程是Main线程,那么该mode下会新建一个线程运行订阅者方法;否则,直接在posting 线程执行
    • ASYNC:该mode下,每次post都会新建一个线程来运行订阅者的方法,通常执行耗时操作是选用该mode,但是EventBus会维护一个线程池重复利用那些已经执行完了的订阅者方法的线程

    3. Subscription

    我们先不看别的,只看他的普通成员:

    • Object subscriber
    • SubscriberMethod subscriberMethod
    • boolean active

    就知道该类代表一个订阅,该订阅包含了:订阅者和订阅方法,EventBus通过这种简单粗暴的方式将这两个东西绑定到一起。

    其构造器异常简单:

    Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
        this.subscriber = subscriber;
        this.subscriberMethod = subscriberMethod;
        active = true;
    } 
    

    唯一值得注意的是,该类的equals方法实现

    public boolean equals(Object other) {
        if (other instanceof Subscription) {
            Subscription otherSubscription = (Subscription) other;
            return subscriber == otherSubscription.subscriber
                    && subscriberMethod.equals(otherSubscription.subscriberMethod);
        } else {
            return false;
        }
    }  
    

    该类的equals方法返回为true的要求是:内部的订阅者是同一个对象(这里使用==判定),且内部的订阅方法是同一个方法(使用订阅方法的equals方法判定)

    4. SubscriberMethod

    SubscriberMethod该类代表一个订阅方法.

    他有三个属性是我们的老朋友了:

    • ThreadMode threadMode
    • int priority
    • boolean sticky

    没错,就是自定义注解Subscribe中声明的三个;另外,SubscriberMethod的成员中还有三个新面孔:

    • Method method:显然,这就是那个添加了Subscribe注解的方法
    • Class<?> eventType: 可以看出,这是代表其想要处理的Event的类型,说白了就是订阅方法的参数类型
    • String methodString:未知

    而其构造器依然简单:

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
    }  
    

    该类和Subscription一样,只是用来表示一个订阅方法,其属性也都是简单的定义了方法的一些信息。

    同样,这个类注意其equals方法:

    public boolean equals(Object other) {
        if (other == this) {
            return true;
        } else if (other instanceof SubscriberMethod) {
            checkMethodString();
            SubscriberMethod otherSubscriberMethod = (SubscriberMethod)other;
            otherSubscriberMethod.checkMethodString();
            return methodString.equals(otherSubscriberMethod.methodString);
        } else {
            return false;
        }
    }    
    

    SubscriberMethod的equals判断非常直观:methodString内容相等即可,实际上这同将method、eventType等挨个进行比较的效果意义是一样的,但是这么通过字符串一转换,就大大提高了效率

    其中checkMethodString方法用来为成员变量methodString初始化

    private synchronized void checkMethodString() {
        if (methodString == null) {
            // Method.toString has more overhead, just take relevant parts of the method
            StringBuilder builder = new StringBuilder(64);
            builder.append(method.getDeclaringClass().getName());
            builder.append('#').append(method.getName());
            builder.append('(').append(eventType.getName());
            methodString = builder.toString();
        }
    }
    

    可见,methodString组成为:声明该具体Method的类名、该具体Method方法名、eventtype的类名的组合

    5. PendingPost

    PendingPost表示一个待处理的Post,其有四个成员:

    • static List<PendingPost> pendingPostPool
    • Object event
    • Subscription subscription
    • PendingPost next

    只看这四个成员,首先event和subscription这两个可以看出EventBus需要把该event发送到subscription来使得订阅者能够通过订阅方法处理这个Event,这符合我们使用EventBus的情形。pendingPostPool可能会表示所有待处理的post,而next看起来想要把PendingPost构造成一个链表的样子。

    其构造器依旧简单:

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }  
    

    毫无营养。我们来看看其obtainPendingPost方法:

    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }  
    

    其流程为:将pendingPostPool池中最后一个待处理Post取出来,将其event和subscription用参数列表中的值覆盖掉,然后返回这个新的PendingPost。可以理解,该方法的目的是从待处理Post池中最后一个取出,并修改其内容后返回,可以理解为消费了一个事件,但是为何要先修改其内容,我们暂且不去理会,后面会有答案。

    PendingPost还有另一个方法,releasePendingPost:

    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }  
    

    这个方法初看让人很疑惑,怎么先将其内容都复制为null之后才往pendingPostPool中添加?我们可以大胆的猜想这个方法是同上一个方法obtainPendingPost搭配使用的,该方法添加一个值全部为null的待处理Post;而obtain的时候取出的就是一个值全部为null的PendingPost,所以在obtian中修改其内容就显得合理了。我们先做出这样一个假设,答案我们接着看。

    6. PendingPostQueue

    PendingPostQueue让我们猜中了开头,可惜没有猜中结尾,原来作者是想要通过上文中提到的PendingPost中的next成员构建一个队列,该类只定义了队列的头尾两个成员:

    • PendingPost head
    • PendingPost tail

    自然而然,也就定了了两种方法,进队和出队

    synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
            tail.next = pendingPost;
            tail = pendingPost;
        } else if (head == null) {
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();
    }  
    
    synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }
    
    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }  
    

    作者也很直接,不整那些花里胡哨的,poll(int toWait)方法直接给我们整一个wait(toWait)就完事儿了。

    7. 小结

    到这里我们先小结一下这几个重要的实体类:

    • SubscriberMethod:该类的实例表示一个订阅方法,也就是使用Subscribe注解的方法;该类的实例包含了一切我们在编写订阅方法时使用到的约束:

      • ThreadMode:订阅的内容以何种方式发送给订阅者
      • priority:优先级
      • sticky:是否是粘性内容
      • eventType:订阅者想要订阅的内容的类型
    • Subscription:该类的实例表示一个订阅者,既然是一个“人”,其必然得包含订阅客户的信息:

      • subscriber:真正订阅者
      • subscirberMethod:订阅者声明的订阅方法
    • PendingPost:该类的实例表示一个待处理的订阅关系,既然是一个关系,那么该类就得定义关系双方的信息:

      • subscription:订阅者的信息
      • event:订阅者订阅的内容

    二、Poster

    各种event都需要Poster发送到正确的地方,而EventBus为各种ThreadMode都实现了一个Poster,在分析各种Poster之前,我们先看看祖宗Poster

    1. Poster

    Poster代表了一次推送事件

    Poster是一个接口,只定义了一个方法:

    void enqueue(Subscription subscription, Object event);  
    

    哇,只有一个方法,对于我们阅读者太友好了。

    2. BackgroundPoster

    BackgroundPoster看名字就能猜出来,这个Poster一定时在ThreadMode为BACKGROUND情况下使用的。

    final class BackgroundPoster implements Runnable, Poster  
    

    可以看出,这个类实现了Runnable接口和Poster接口,也就是说可以直接把它作为线程的参数来跑。

    这个类指定义了三个成员:

    • PendingPostQueue queue
    • EventBus eventBus
    • boolean executorRunning

    其构造器简单依旧:

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }  
    

    注意,这里的queue是自己初始化的,不是从别的地方传过来的,我们从这里也可以大胆猜测,不同ThreadMode下的Poster各自维护一个PendingPostQueue,它们之间老死不相往来

    而作者再一次展现了体恤万千码农的贴心,该类的方法就两个,分别是Runnable接口的run方法和Poster接口的enqueue方法。

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }  
    

    来了来了,这里看到了PendingPost的obtainPendingPost方法的调用,该方法在PendingPost池不为空的时候取出来最后一个修改内容之后返回,或者池为空时直接新建一个PendingPost返回。这里,直接将该PendingPost加入到了pendingPostQueue中,之后调用了:

    eventBus.getExecutorService().execute(this);  
    

    嗯,感觉最终Event能够传递到订阅者那里是依靠这里,暂且记下。

    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
    

    这里有一个无限循环,一直从queue中取PendingPost,如果队列为空了,那么就直接return了(这里不同于Looper的loop,这里为空是直接return了),对于一个Runnable来说,主动return就意味着它的生命结束了。如果不为空,那么将取到的PendingPost交给

    eventBus.invokeSubscriber(pendingPost);  
    

    看名字,好像这里也像是将PendingPost发送给订阅者的方法,我们也暂且记下。

    Q:这里从队列中取出PendingPost时首先调用的是带等待时间的poll,作者必有深意?

    3. AsyncPoster

    AsyncPoster的名字也暴露了它,显然这是为ThreadMode为ASYNC的情况下准备的Poster。

    该类也同BackgroundPoster一样,同样实现了Runnable和Poster接口,只有两个成员:

    • PendingPostQueue queue
    • EventBus eventBus

    构造器简单依旧:

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }  
    

    其run方法和enqueue更加简单,没有任何新意:

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }
    
    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }  
    

    4. HandlerPoster

    HandlerPoster就显得很特立独行,从名字无法知道其用于那种ThreadMode。

    其有四个成员:

    • PendingPostQueue queue
    • int maxMillsInsideHandlerMessage
    • EventBus eventBus
    • boolean handlerActive

    从成员来看,其更像BackgroundPoster一些。

    public class HandlerPoster extends Handler implements Poster  
    

    其继承了Handler,实现了Poster接口。

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }  
    

    倒是构造器,简单不减当年。

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }  
    

    别的没啥说的,鸡贼的作者吧发送message放到了if条件中。

    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }  
    

    注意,这里两次调用的poll都是没有等待的重载方法。这里有个判断,如果该事件被处理的时间超过了maxMillisInsideHandleMessage就会尝试发送一个空的Message,怀疑这里是用来判断HandlerPoster死了没

    三、EventBus

    终于要到EventBus了。

    几个Map成员

    EventBus下有几个Map映射需要提前了解:

    • Map<Class<?>, List<Class<?>>> eventTypesCache:该成员的key是所有订阅方法的参数event的Class;value集合是该event所有的直接和间接父类型、接口类型
    • Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType:该成员的key是所有订阅方法的参数event的Class;value是所有与之相关的订阅关系,注意一个订阅者内可以有多个订阅方法,subscriptionByEventType中每一个eventType对应的List的长度要大于等于eventTypeCache中每一个eventType对应的List长度;由于不同的订阅者可能有同名、同参的订阅方法,所以这里value值的泛型也不能是SubscriberMethod,只能是Subscription,严谨!!
    • Map<Object, List<Class<?>>> typesBySubscriber:该成员的key是订阅者;而value是该订阅者中所有订阅方法使用到的event的Class类型
    • Map<Class<?>, Object> stickyEvents:以event的Class类型为key,其value为该event的具体的对象,表示待处理的一个黏性event

    1. 一切的开始--EventBus.register(Object)

    该方法用来注册,否则的话,EventBus没法用了。

     public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }  
    

    首先,EventBus根据我们注册时传递的对象获取到了其Class对象,这样一来,EventBus就知道订阅发生在该Class里面了;接下来,EventBus通过SubscriberMethodFinder这个工具从这个Class中找到(具体怎么找我们后面分析)所有使用了Subscribe注解的方法,这些就是所有的订阅方法;最后,EventBus为每一个方法执行了subscribe方法

    subscribe(Object subscriber, SubscriberMethod subscriberMethod)

    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    
        -------------- 重点 1 ----------------
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
    
        -------------- 重点 2 ---------------
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
    
        ------------- 重点 3 -------------
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
    
        ---------------- 重点 4 ------------------
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }  
    

    好长一方法,不慌,我们慢慢看,只看思路,细节不管:

    • 重点 1:
      • 首先,EventBus根据每一个订阅方法的eventType(也就是该订阅方法参数的类型对应的Class对象)获取所有相关的Subscription,这些个Subscription订阅关系中的订阅方法中的参数都有着共同的类型
      • 之后判断如果该eventType是第一次出现,之前并没有Subscription与其相关,那么新建一个List将新的Subscription加入进去
      • 如果之前有过与该eventType相关的Subscription出现,那么看看有没有该Subscription,如果有说明发生了重复订阅,抛出异常;根据Subscription和SubscriberMethod的equals方法的实现,我们可以明白:判断是否重复订阅的考量只有订阅方法所在的类名、订阅方法名、订阅方法的参数类型三个,并不会把ThreadMode、isSticky、priority作为考量的依据,所以不能够将两个方法相同,ThreadMode、isSticky、priority不同的订阅方法放到同一个订阅者中
    • 重点 2:这里的思路很清晰,我们在使用Subscribe注解声明一个订阅方法的时候可以设置其权重值,这里就是将Subscription插入的时候按照权重值从小到大排列的
    • 重点 3:这里的作用是将所有subscriber订阅者以及该订阅者所有需要的eventType使用一个HashMap绑定起来
    • 重点 4:可见,如果该订阅方法的sticky为true的话,那么会从stickyEvents中寻找是否存在该event类型的待处理的post事件,如果有的话,此时会直接调用chechPostStickyEventToSubscription方法,而该方法会直接调用postToSubscription方法消费post事件(该方法post后期也会调用)

    2. 干活儿的地方--post(Object event)

    public void post(Object event) {  
        ----------- 重点 1 --------
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
    
        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                ----------- 重点 2 ----------
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }  
    
    • 重点 1:这个postingState是EventBus的一个静态内部类,其将一系列的event同一个订阅关系Subscription绑定起来,也就是说,这一些列的event将被post给这个Subscription。这里就是将待处理的event放到其队列中。
    • 重点 2:这里将每一个event从队列中挨个取出,对其执行postSingleEvent方法

    postSticky

    该方法用来post一个黏性事件:

    public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        post(event);
    }  
    

    可见,就是将其先放入stickyEvents中,然后再尝试调用post方法看看有没有人能够消费掉

    postSingleEvent(Object event, PostingThreadState postingState)

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            ---------- 重点 1 -----------
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }  
    
    • 重点 1:这里的lookupAllEventTypes通过eventClass通过eventTypesCache属性得到了所有与该eventType相关的父类和接口;然后依次执行postSingleEventForEventType

    postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass)

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {  
            ----------- 重点 1 ---------------
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {  
            ---------- 重点 2 -----------
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }  
    
    • 重点 1:该处首先会拿到所有与该event有关的订阅关系
    • 重点 2:该处为每一个与event有关的订阅关系执行postToSubscription方法

    postToSubscription(Subscription subscription, Object event, boolean isMainThread)

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }  
    

    哇偶,好兴奋啊,我们在这个方法里看到了对不同ThreadMode的处理

    • POSTING:该模式下会与post事件位于同一线程,所以这里直接调用invokeSubscriber没毛病
    • BACKGROUND:如果当前不是位于主线程,那么会直接调用invokeSubscriber消费掉post事件;如果post事件产生于主线程那么会将其入队,而不是立马消费掉,这里的poster为BackgroundPoster
    • ASYNC:该模式下直接入队,这里的poster为AsyncPoster
    • MAIN:如果post产生于主线程,那么直接就地消费掉;否则,将其入队,这里的poster为HandlerPoster
    • MAIN_ORDERED:好像和MAIN效果一样啊?

    各个Poster的enqueue操作都会调起EventBus里的线程池来执行消费事件

    invokeSubscriber(Subscription subscription, Object event)

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }  
    

    最终,会通过反射的机制来调用订阅方法消费掉post事件

    3. 小结

    让我们重新开始捋一捋,从register到post发生了什么:

    • register:该方法的参数即订阅者Subscriber对象
      • 首先,EventBus会找遍该订阅者中所有使用Subscribe注解过的订阅方法
      • 然后,通过subscribe方法将每一个订阅方法联通订阅者封装成一个个订阅Subscription,并且将所有出现的eventType作为key,映射到所有与该eventType相关的Subscription(用List放置所有的Subscription,注意Subscription在这个List中的顺序按照其所隐含的SubscriberMethod的priority大小排列);这样一来,就隐式地为每一个订阅方法构建了一个eventType-Subscription-SubscriberMethod关系
      • 之后,subscribe方法还会以订阅者为key,将订阅者中所有订阅方法所涉及到的eventType保存起来
    • post:
      • 首先,该方法会拿到一个PostingThreadState对象,EventBus内部使用ThreadLocal来保存每一个PostingThreadState对象,也就是说,EventBus的post方法可能在不同的线程内被调用,那么会为每一个线程保存一个PostingThreadState对象;该对象内部有一个event的队列用来保存在该线程上调用post方法产生的事件;EventBus将post方法中的event放到这个PostingThreadState的队列中去
      • 然后,post方法就会从PostingThreadState的event队列中挨个将所有的event取出,调用postSingleEvent方法
    • postSingleEvent:
      • 首先,通过属性成员eventTypesCache拿到该event所相关的父类型、接口类型(包括直接和间接)
      • 然后,为每一个Subscription订阅执行postSingleEventForEventType,我们假设我们的event没有实现任何接口,没有任何直接父类
    • postSingleEventForEventType:
      • 首先,该方法通过register方法生成的event-Subscription-SubscriberMethod表得到所有与该event相关的订阅Subscription
      • 然后,为每一个订阅Subscription调用postToSubscription方法,注意在register方法中Subscription是依照priority的大小进行排列的,所以priority越大的Subscription会先被执行
    • postToSubscription:该方法会对不同ThreadMode情况做处理,如果想要消费掉post事件的话,是通过invokeSubscriber方法
    • invokeSubscriber:该方法通过反射调用订阅方法来消费掉post事件

    未完待续~

    相关文章

      网友评论

          本文标题:EventBus源码解析

          本文链接:https://www.haomeiwen.com/subject/pvmayhtx.html