美文网首页
EventBus源码解读下篇:事件的发送与分发

EventBus源码解读下篇:事件的发送与分发

作者: 一线游骑兵 | 来源:发表于2019-01-03 21:57 被阅读14次

    目的:分析post事件的发送与分发

    从post开始:

          /** Posts the given event to the event bus. */
        public void post(Object event) {
            PostingThreadState postingState = currentPostingThreadState.get();      //获取到当前线程中的PostingThreadState
            List<Object> eventQueue = postingState.eventQueue;  //获取该线程中事件的队列
            eventQueue.add(event);  //将新post的事件入队
    
            if (!postingState.isPosting) {  //如果没有正在发送
                postingState.isMainThread = isMainThread();     //判断当前线程是否在主线程运行,如果在非安卓环境,总是返回true
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
                    while (!eventQueue.isEmpty()) {     //循环遍历队列所有事件,然后发送
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    

    特别说明:

    1. currentPostingThreadState.get();返回的是当前线程中ThreadLocal中存储的PostingThreadState对象,ThreadLocal是线程内部的存储类,线程私有的,每个线程对同一个key都可以有不同的一个副本。具体解析:ThreadLocal源码解析
    2. PostingThreadState对象是一个静态类:存储了该线程中要发送的事件队列,发送事件是否为UI线程,该事件的订阅者等信息
        final static class PostingThreadState {
            final List<Object> eventQueue = new ArrayList<>();
            boolean isPosting;
            boolean isMainThread;
            Subscription subscription;
            Object event;
            boolean canceled;
        }
    
    1. 将该事件加入该线程的事件队列,然后循环发送单个事件。

    发送单个事件:

        //发送某个事件【包含该事件的父类或接口事件】
        private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            if (eventInheritance) {     //是否也向订阅了该事件的父类的方法发送,默认为true
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);        //查询到订阅了该事件类型的所有的父类或接口类型
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);       //发送具体的事件类型
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            if (!subscriptionFound) {       //没有找到接受者
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) { //发送一个默认的没有找到监听者的事件
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
    

    方法说明:

    1. 获取到所有订阅了该事件类型的父类型,然后发送该类型及其所有父类型。
    2. 如果没有找到订阅者,则系统会默认发送给一个事件类型为 NoSubscriberEvent的通知,外部可通过该类型进行事件发送失败的监听。

    发送某一具体类型:

        private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
            CopyOnWriteArrayList<Subscription> subscriptions;
            synchronized (this) {
                subscriptions = subscriptionsByEventType.get(eventClass);       //获取到该事件的所有订阅者
            }
            if (subscriptions != null && !subscriptions.isEmpty()) {        //如果订阅者不为空
                for (Subscription subscription : subscriptions) {       //循环遍历所有订阅者
                    postingState.event = event;
                    postingState.subscription = subscription;
                    boolean aborted = false;
                    try {
                        postToSubscription(subscription, event, postingState.isMainThread); //将事件发送给订阅者
                        aborted = postingState.canceled;
                    } finally { //重置事件状态
                        postingState.event = null;
                        postingState.subscription = null;
                        postingState.canceled = false;
                    }
                    if (aborted) {
                        break;
                    }
                }
                return true;
            }
            return false;
        }
    

    也很简单,找到该订阅该事件的所有订阅者(如何通过事件类型获取所有订阅者可以看上篇文章),然后将事件发送给订阅者:

        //将事件发送给具体的订阅者,入参isMainThread是指post的线程是否为主线程
       private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {     //首先判断一下订阅者所在的线程
                case POSTING:       //发送者与订阅者在统一线程,则直接反射调用
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:      //订阅者在主线程,
                    if (isMainThread) {     //发送者也在主线程,则直接反射调用
                        invokeSubscriber(subscription, event);
                    } else {        //发送者在非UI线程,则加入主线程发送器的队列等待发送
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:      //订阅者在主线程,且始终有序【不并发】
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:        //订阅者在后台线程,
                    if (isMainThread) {     //发送者在UI线程,则添加到后台发送器队列等待发送
                        backgroundPoster.enqueue(subscription, event);
                    } else {        //都在后台线程,直接反射调用
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:     //在一个独立于发送线程和Ui线程的线程中发送
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    通过上边的几个方法,我们已经知道了post发送事件所在线程,要发送的对象,订阅者所在的线程,订阅者类,订阅者方法关键信息。因此,下边主要搞明白如何进行线程间通信的·。
    首先要明白一个概念:发射器:Poster

    interface Poster {
    
        /**
         * Enqueue an event to be posted for a particular subscription.
         *
         * @param subscription Subscription which will receive the event.
         * @param event        Event that will be posted to subscribers.
         */
        void enqueue(Subscription subscription, Object event);
    }
    
    

    该接口包含一个方法:将订阅者对象以及要发送的信息添加到发送队列等候发送。
    该接口共有3个实现类:


    image.png

    说明:

    • HandlerPoster负责向主线程发送事件
    • BackgroundPoster负责向后台非UI线程发送事件
    • AsyncPoster也是向后台非UI线程发送事件,

    BackgroundPosterAsnchPoster之间的区别在于:
    1. 如果发送者与接受者都在同一后台线程,BackgroundPoster会直接在该线程反射调用。而AsycnPoster则不管发送者与接收者是否在同一线程,始终会开启一个新的后台线程反射调用接受者函数。
    2. 在短时间内并发发送大量事件时,BackgroundPoster会保证接受者接收到的事件也是有序的,而由于AsycnPoster每次实在单独线程回调,因此接受者接收的事件是不固定的。

    下面先看一下BackgroundPoster的实现:

    final class BackgroundPoster implements Runnable, Poster {
    
        private final PendingPostQueue queue;
        private final EventBus eventBus;
    
        private volatile boolean executorRunning;
    
        BackgroundPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!executorRunning) {
                    executorRunning = true;
                    eventBus.getExecutorService().execute(this);
                }
            }
        }
    
        @Override
        public void run() {
            try {
                try {
                    while (true) {
                        PendingPost pendingPost = queue.poll(1000);
                        if (pendingPost == null) {
                            synchronized (this) {
                                // Check again, this time in synchronized
                                pendingPost = queue.poll();
                                if (pendingPost == null) {
                                    executorRunning = false;
                                    return;
                                }
                            }
                        }
                        eventBus.invokeSubscriber(pendingPost);
                    }
                } catch (InterruptedException e) {
                    eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
                }
            } finally {
                executorRunning = false;
            }
        }
    
    }
    

    该方法同时实现了两个接口:RunnablePoster
    外部通过调用enqueue将事件与订阅者入队,然后会通过EventBus中的一个线程池来执行自身的run方法。在run方法中开启了一个死循环,会一直取出该队列中的PendingPost对象,然后通过eventBus.invokeSubscriber(pendingPost);来反射调用订阅者方法。
    这里对PendingPost类说明一下:

    final class PendingPost {
        private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
    
        Object event;
        Subscription subscription;
        PendingPost next;
    
        private PendingPost(Object event, Subscription subscription) {
            this.event = event;
            this.subscription = subscription;
        }
    
        static PendingPost obtainPendingPost(Subscription subscription, Object event) {
            synchronized (pendingPostPool) {
                int size = pendingPostPool.size();
                if (size > 0) {
                    PendingPost pendingPost = pendingPostPool.remove(size - 1);
                    pendingPost.event = event;
                    pendingPost.subscription = subscription;
                    pendingPost.next = null;
                    return pendingPost;
                }
            }
            return new PendingPost(event, subscription);
        }
    
        static void releasePendingPost(PendingPost pendingPost) {
            pendingPost.event = null;
            pendingPost.subscription = null;
            pendingPost.next = null;
            synchronized (pendingPostPool) {
                // Don't let the pool grow indefinitely
                if (pendingPostPool.size() < 10000) {
                    pendingPostPool.add(pendingPost);
                }
            }
        }
    
    }
    
    1. 该类中有一个静态列表作为对象池来存放自身
    2. 除了必要的发送事件对象以及订阅者对象,还有一个next指针指向下一个PendingPost
    3. 在外部通过静态方法obtainPendingPost时,会先从池中获取一个干净的对象然后赋值返回,否则会创建一个新的。
    4. 外部使用完后会通过releasePendingPost来释放抹去数据加入池中【如果池大小<10000】

    上边的BackgroundPoster类中持有一个PendingPostQueue,该队列中声明了两个PendingPost对象作为head 和 tail。每次enqueue入队操作会将新入队的PendingPost作为队尾。每次发送回通过poll来获取head队头,然后将头指针后移来达到队列的结构。

    最后来看一下真正反射调用的方法:

        void invokeSubscriber(PendingPost pendingPost) {
            Object event = pendingPost.event;
            Subscription subscription = pendingPost.subscription;
            PendingPost.releasePendingPost(pendingPost);  //在pendingPost使命完成后会release入池
            if (subscription.active) {
                invokeSubscriber(subscription, event);
            }
        }
    
        void invokeSubscriber(Subscription subscription, Object event) {
            try {  //通过反射回调方法。
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
        }
    

    OK,想后台线程发送事件分析完毕,下面看一下如何向主线程发送事件:

    public class HandlerPoster extends Handler implements Poster{
        private final PendingPostQueue queue;
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!handlerActive) {
                    handlerActive = true;
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
    
       @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                handlerActive = rescheduled;
            }
        }
    

    不难猜想,是通过new Handler(Looper.getMainLooper())来获取主线程的handler,在入队操作中通过handler.sendMessage()发送一个空消息,进入handleMessage方法中处理,在handleMessage方法中通过while(true)方法来不断获取队列中将要发送的事件,最后通过反射调用将事件发送到订阅者方法中。


    总结
    1. 通过ThreadLocal来存放post到每个线程中的事件队列。
    2. 查找订阅了发送事件类型的父类型的方法,然后加入发送队列。
    3. 通过HandlerPosterBackgroundPosterAsyncPoster来实现不同线程之间的事件发送。
    4. 使用了对象池技术来实现PendingPost对象的获取与释放。
    5. 通过PendingPostQueue实现PendingPost的队列操作。
    6. 通过反射调用订阅者方法将发送事件传递给订阅者。

    完。

    相关文章

      网友评论

          本文标题:EventBus源码解读下篇:事件的发送与分发

          本文链接:https://www.haomeiwen.com/subject/glafrqtx.html