美文网首页
EventBus3.0源码分析(二)事件分发器

EventBus3.0源码分析(二)事件分发器

作者: kakaxicm | 来源:发表于2018-07-19 16:51 被阅读0次

    引言

    上篇文章我们花了大量篇幅详细分析了订阅方法收集、事件的注册和发布流程,关于发布流程,涉及到三个很重要的事件分发器(Poster),用于执行订阅方法,由于涉及到线程切换和调度,个人认为很重要,所以单独拿出来分析学习它的实现思想。

    Poster接口及相关的基础类

    Poster接口

    /**
     * Posts events.
     *
     * @author William Ferguson
     */
    interface Poster {
    
        /**
         * Enqueue an event to be posted for a particular subscription.
         *
         * @param subscription Subscription which will receive the event.
         * @param event        Event that will be posted to subscribers.
         */
         //订阅者和事件对象加入各自的Poster队列
        void enqueue(Subscription subscription, Object event);
    }
    

    可以看到顶层接口的方法是将订阅者和event对象封装加入执行队列,subscription封装了订阅方法和调用者对象(subscriber),有了这些信息所有执行订阅方法的条件都万事俱备,就只须加入调度队列等待执行就可以了。它有三个实现类:
    1.HandlerPoster负责主线程执行订阅方法,订阅方法的ThreadMode为Main/ MAIN_ORDERED,其中Main标记的订阅方法会立即执行,MAIN_ORDERED标记的订阅方法加入到主线程消息队列交给Handler执行,有可能不会立即执行;
    2.BackgroundPoster负责在全局唯一后台线程执行订阅方法,由于执行线程只有一个,所以有可能不会立即执行;
    3.AsyncPoster负责在子线程异步执行订阅方法,如果有空间线程,则会立即执行。
    在看这三个分发器之前,我们需要先熟悉几个相关类:PendingPost和PendingPostQueue,它是分发器的调度对象。

    PendingPost

    final class PendingPost {
        //内部维护了一个复用池
        private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
    
        Object event;
        Subscription subscription;
        PendingPost next;//说明PendingPost是链表节点,用于构造PendingPostQueue队列
        
        //构造方法封装传入的事件对象和订阅者
        private PendingPost(Object event, Subscription subscription) {
            this.event = event;
            this.subscription = subscription;
        }
        
        //从复用池中取或者创建PendingPost
        static PendingPost obtainPendingPost(Subscription subscription, Object event) {
            synchronized (pendingPostPool) {
                int size = pendingPostPool.size();
                if (size > 0) {
                    //从复用池中取
                    PendingPost pendingPost = pendingPostPool.remove(size - 1);
                    pendingPost.event = event;
                    pendingPost.subscription = subscription;
                    pendingPost.next = null;
                    return pendingPost;
                }
            }
            //复用池中没有则新建
            return new PendingPost(event, subscription);
        }
        //释放pendingPost到复用池
        static void releasePendingPost(PendingPost pendingPost) {
            pendingPost.event = null;
            pendingPost.subscription = null;
            pendingPost.next = null;
            synchronized (pendingPostPool) {
                // Don't let the pool grow indefinitely
                if (pendingPostPool.size() < 10000) {
                    pendingPostPool.add(pendingPost);
                }
            }
        }
    }
    

    代码比较简单,它内部维护了复用池,而且是个单链表结构,用于构建PendingPostQueue用,后面会讲到。这个对象是分发器的基本调度对象。我们再来看看它的上层PendingPostQueue队列:

    PendingPostQueue

    final class PendingPostQueue {
        //队列数据结构
        private PendingPost head;//头节点
        private PendingPost tail;//尾节点
        
        //同步方法,从尾部添加
        synchronized void enqueue(PendingPost pendingPost) {
            if (pendingPost == null) {
                throw new NullPointerException("null cannot be enqueued");
            }
            if (tail != null) {
                tail.next = pendingPost;
                tail = pendingPost;
            } else if (head == null) {
                head = tail = pendingPost;
            } else {
                throw new IllegalStateException("Head present, but no tail");
            }
            notifyAll();
        }
        //从队头取
        synchronized PendingPost poll() {
            PendingPost pendingPost = head;
            if (head != null) {
                head = head.next;
                if (head == null) {
                    tail = null;
                }
            }
            return pendingPost;
        }
        //如果队列为空,则挂起maxMillisToWait时间,再尝试取,如果不为空则直接取
        synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
            if (head == null) {
                wait(maxMillisToWait);
            }
            return poll();
        }
    }
    

    可见PendingPostQueue是典型的PendingPost队列,提供了入队和出队方法,按FIFO维护PendingPost的存取,注意存取方法都是同步方法,这样做的原因是post操作在任何地方都可能执行,这里是为了队列的线程安全。说完这两个类,结合下面Post的结构图,具体的Poster实现就更加容易理解了。


    Poster

    HandlerPoster

    public class HandlerPoster extends Handler implements Poster {
    
        private final PendingPostQueue queue;//调度队列
        private final int maxMillisInsideHandleMessage;
        private final EventBus eventBus;
        private boolean handlerActive;
    
        protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
            super(looper);//这个looper就是主线程Looper
            this.eventBus = eventBus;
            this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
            queue = new PendingPostQueue();//构造任务队列
        }
    
        public void enqueue(Subscription subscription, Object event) {
            //构造pendingPost对象封装订阅者和事件
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                //任务入队
                queue.enqueue(pendingPost);
                if (!handlerActive) {//handlerActive是一个标志位,标志当前是否正在执行处理PendingPostQueue队列中的PendingPost,也就是正在调用队列中的注册方法
                    handlerActive = true;
                    //发送消息
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
    
        @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            //从队列中取
                            pendingPost = queue.poll();
                            if (pendingPost == null) {//如果队列为空则返回
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    //执行订阅方法
                    eventBus.invokeSubscriber(pendingPost);
                     //如果方法超时,则退出循环重新触发
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                handlerActive = rescheduled;
            }
        }
    }
    

    可以看到HandlerPoster 继承Handler,enqueue方法将pendingPost加入队列,并发送消息, handleMessage轮询队列执行订阅方法。

    BackgroundPoster

    /**
     * Posts events in background.
     *
     * @author Markus
     */
    final class BackgroundPoster implements Runnable, Poster {
        //执行队列
        private final PendingPostQueue queue;
        private final EventBus eventBus;
    
        private volatile boolean executorRunning;
    
        BackgroundPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
             //构造pendingPost
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                //任务加入队列
                queue.enqueue(pendingPost);
                if (!executorRunning) {//executorRunning表示后台线程正在处理队列中的任务
                    executorRunning = true;
                    //执行run方法,循环取出任务执行,执行线程池为eventBus的线程池
                    eventBus.getExecutorService().execute(this);
                }
            }
        }
    
        @Override
        public void run() {
            try {
                try {
                    while (true) {
                        //队列为空的时候,阻塞1000毫秒,当会被enqueue方法唤醒
                        PendingPost pendingPost = queue.poll(1000);
                        if (pendingPost == null) {
                            synchronized (this) {
                                // Check again, this time in synchronized
                                pendingPost = queue.poll();
                                if (pendingPost == null) {//两次取,队列确实为空则跳出循环
                                    executorRunning = false;
                                    return;
                                }
                            }
                        }
                        //取到任务执行它
                        eventBus.invokeSubscriber(pendingPost);
                    }
                } catch (InterruptedException e) {
                    eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
                }
            } finally {
                executorRunning = false;
            }
        }
    }
    

    这里的BackgroundPoster是个Runnable对象,它的执行交给eventBus的线程池。需要说明的是在run方法循环从队列取任务的时候,执行poll方法,传入了空闲挂起时间,这表示当队列空闲时,poll内部执行wait挂起,等待挂起时间结束,再从队列取,挂起时间内,如果其他线程执行enqueue方法,将会唤起它,这个方法也设置了空闲状态下的最大等待时间。具体请看队列的enqueue方法。当队列为任务都执行完毕的时候,executorRunning置为false,表示当前分发器处于空闲状态。

    AsyncPoster

    class AsyncPoster implements Runnable, Poster {
    
        private final PendingPostQueue queue;
        private final EventBus eventBus;
    
        AsyncPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            queue.enqueue(pendingPost);
            eventBus.getExecutorService().execute(this);
        }
    
        @Override
        public void run() {
            PendingPost pendingPost = queue.poll();
            if(pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            }
            eventBus.invokeSubscriber(pendingPost);
        }
    }
    

    这个和BackgroundPoster类似,run方法也是在子线程中执行,不同的是BackgroundPoster所有任务均在一个线程中执行,而AsyncPoster中的任务都是在独立的线程中执行。
    最后补充一下注销方法:
    上篇文章我们分析了EventBus中为了反映事件和订阅者对象(Subscriber)多对多的关系,有两个集合,一个是从事件类型(EventType)到订阅方法(Subscribtion, Subscriber是它们的调用者, Subscribtion持有方法和Subscriber)的一对多映射,表明一个事件可以被N个订阅方法订阅,另一个是从订阅者对象(Subscriber)到EventType的一对多映射,表示一个订阅者对象可以持有多个订阅方法。那么注销的时候也是对这两个集合的处理:

        public synchronized void unregister(Object subscriber) {
            //拿到订阅者支持的事件类型
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                for (Class<?> eventType : subscribedTypes) {
                    //结合eventType对应的订阅方法集合,删除该订阅者对象的Subscribtion
                    unsubscribeByEventType(subscriber, eventType);
                }
                //移除该订阅者对象的映射
                typesBySubscriber.remove(subscriber);
            } else {
                logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
        }
    

    再看unsubscribeByEventType方法:

        private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
             //拿到事件类型对应的Subscription集合
            List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions != null) {
                int size = subscriptions.size();
                for (int i = 0; i < size; i++) {
                    Subscription subscription = subscriptions.get(i);
                    //取调用者,删除该调用者的subscription
                    if (subscription.subscriber == subscriber) {
                        subscription.active = false;
                        subscriptions.remove(i);
                        i--;
                        size--;
                    }
                }
            }
        }
    

    总结

    至此EventBus的主体流程分析完毕,这里我们着重学习它的以下设计思想:
    1.反射注解实现订阅者的收集;
    2.观察者模式应用;
    3.线程切换的实现思路;
    它通过注解描述事件订阅者,然后注册到总线中去,在分发的时候加入线程切换机制,完美实现了事件发布与订阅的解耦,在项目运用中实现组件的通信,例如在两个Activity之间实现通信,只要在一个Activity里构造好事件对象,然后发布到总线,在另一个Activity中注册总线,定义好订阅方法即可,这里的订阅方法一般设置为接收粘性事件,因为事件已经在注册之前发布,注册好之后可以立即接收粘性事件。

    相关文章

      网友评论

          本文标题:EventBus3.0源码分析(二)事件分发器

          本文链接:https://www.haomeiwen.com/subject/dxizpftx.html