一篇讲明白EventBus

作者: 一个追寻者的故事 | 来源:发表于2020-09-15 09:10 被阅读0次

    先说EventBus是什么: EventBus是 基于 订阅/发布 模式实现的 基于事件的异步分发处理系统。 好处就是能够解耦 订阅者 和 发布者,简化代码。乍一听很懵逼,别着急慢慢往下看。

    基于 eventbus:3.2.0

    整个文章目录:

    一、案例展示
    二、订阅
    三、发布
    四、APT提升效率

    这里有一个小纠结。如果是为了纯粹的方便自己回忆,肯定会贴很多核心代码。如果是为了让没用过的人看懂呢,又不能有太多代码。往往贴了很多代码的文章都不怎么受欢迎。所以决定尝试一下不同的写法,达到一个代码 和 原理相对平衡的状态。

    一、案例展示

    data class MsgEvent(val msg: String, val code: Int) 
    
    open class BaseActivity : AppCompatActivity() {
        @Subscribe(priority = 6)
        fun onHandlerMsg3(event: MsgEvent){
            Log.e(MainActivity.TAG, "BaseActivity # onHandlerMsg3  ${event.msg} - ${event.code}")
        }
    }
    
    class MainActivity : BaseActivity() {
        companion object{
            val TAG = "EventBusDemo"
        }
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            EventBus.getDefault().register(this)
    
            welcome.setOnClickListener {
                EventBus.getDefault().post(MsgEvent("Hello, EventBus", 22))
            }
        }
    
        @Subscribe(priority = 2)
        fun onHandlerMsg(event: MsgEvent){
            Log.e(TAG, "MainActivity # onHandlerMsg  ${event.msg} - ${event.code}")
        }
    
        @Subscribe(priority = 4)
        fun onHandlerMsg2(event: MsgEvent){
            Log.e(TAG, "MainActivity # onHandlerMsg2  ${event.msg} - ${event.code}")
        }
    
        override fun onDestroy() {
            super.onDestroy()
            if(EventBus.getDefault().isRegistered(this)){
                EventBus.getDefault().unregister(this)
            }
        }
    }
    

    当按钮点击时,Log日志:

    com.daddyno1.eventbusdemo E/EventBusDemo: BaseActivity # onHandlerMsg3  Hello, EventBus - 22
    com.daddyno1.eventbusdemo E/EventBusDemo: MainActivity # onHandlerMsg2  Hello, EventBus - 22
    com.daddyno1.eventbusdemo E/EventBusDemo: MainActivity # onHandlerMsg  Hello, EventBus - 22
    

    上边一个简单的例子,实现了订阅者的订阅、发布者发布消息以及订阅者对消息的处理。

    二、订阅

    EventBus.getDefault().register(this) 就会注册 MainActivity对象 为订阅者。
    这个过程分为两步:
    1、获取 MainActivity对象 中方法有 @Subscribe 修饰 且 参数有且仅有一个的列表。
    2、把这些信息记录起来,以供后续发送者发送消息时通知使用。

        public void register(Object subscriber) {
            Class<?> subscriberClass = subscriber.getClass();
            // 1、获取订阅者有 @Subscribe 修饰 且 参数有且仅有一个的方法 的列表。
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            // 2、把这些信息记录起来,以供后续发送者发送消息时通知使用。
            synchronized (this) {
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
    
    2.1、获取订阅者中方法有 @Subscribe 修饰 且 参数有且仅有一个的列表。

    有两种实现方式:反射 和 APT

    反射

    // 为了说明这里列出最核心的代码,省率了很多实现细节。
    Method[] methods = clazz.getDeclaredMethods();
    for (Method method : methods) {
         Class<?>[] parameterTypes = method.getParameterTypes();
         if (parameterTypes.length == 1) {
            Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
            Class<?> eventType = parameterTypes[0];
            ThreadMode threadMode = subscribeAnnotation.threadMode();    
            new SubscriberMethod(method, eventType, threadMode,subscribeAnnotation.priority(), subscribeAnnotation.sticky())
         }
    }
    

    这些常规方法没有太特别的实现,也很好理解。然后把这个方法相关的信息全部封装到了一个叫 SubscriberMethod的类中,请记住这个类,它有订阅者事件处理方法的所有信息。

    public class SubscriberMethod {
        final Method method;      // 方法
        final ThreadMode threadMode;  // 线程模式
        final Class<?> eventType;  // 事件
        final int priority;  // 优先级
        final boolean sticky;  // 是否粘性的
        ...
    }
    

    APT
    这种方式具体实现先不讲,先知道有这么一种实现方式,具体的放到后面讲。

    2.2、把这些信息记录起来,以供后续发送者发送消息时通知使用。

    上一步骤得到的这些信息是如何存的呢,这里要讲个数据结构:
    subscriptionsByEventType: 一个Map
    key: 事件类型,如本例 MsgEvent.class
    value: 一个按照订阅方法优先级排序的订阅者的列表集合。

    subscriptionsByEventType 这个集合很重要,存储了 订阅事件所有该事件订阅者 的一个映射。之后发送消息的时候,会直接从这里取出所有订阅了此事件的订阅者,依次通知,就完成事件的分发,具体细节后面会讲。

    为了更好的理解 subscriptionsByEventType 这里画了一个示意图辅助理解。理解这个数据结构很重要,所以要记好了。

    示例代码中的 subscriptionsByEventType 数据集合辅助示意图

    这里有还有一个类需要提一下,Subscription 代表了订阅者。包含订阅者对象 和 订阅者的事件订阅方法。

    final class Subscription {
        final Object subscriber;    // 订阅者对象
        final SubscriberMethod subscriberMethod;  // 订阅者的某个方法
        ...
    }
    

    至此,整个订阅过程完毕。可以猜想一下 unregister过程,也很简单,就是删除 subscriptionsByEventType 中对应的订阅者即可。

    三、发布

    EventBus.getDefault().post(MsgEvent("Hello, EventBus", 22))

    刚才讲订阅数据结构 subscriptionsByEventType 时候也说到了,发布消息的时候,会从 subscriptionsByEventType 中找到所有的 Subscription,然后挨个通知。我们这里只看核心的通知方法 postToSubscription

        private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    invokeSubscriber() 就是简单的通过反射调用方法:

        void invokeSubscriber(Subscription subscription, Object event) {
            try {
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
        }
    

    讲到这里就涉及到一个 ThreadMode 的概念,每一个 订阅者的订阅方法「有@Subscribe修饰的方法」都有一个 ThreadMode 的概念。用于标识最终被EventBus调用时该方法所在的线程,默认是 POSTING

    ThreadMode 列表:

    • POSTING
    • MAIN
    • MAIN_ORDERED
    • BACKGROUND
    • ASYNC

    POSTING 默认的线程模式。订阅者的订阅方法将被调用的线程 和 post 事件时所在的线程一致,通过反射直接调用。这个方式会阻塞 posting thread,所以尽量避免做一些耗时的操作,因为有可能阻塞的是 主线程。

    MAIN 如果发送事件时在Android的主线程,则订阅者将被直接调用(blocking)。 如果发送事件时不在Android 主线程,则会把事件放入一个队列,等待挨个处理(not-blocking)。

    MAIN_ORDEREDMAIN 不一样的是,它总是通过 Android的 Handler机制把事件包装成消息,放入主线程的消息队列。它总是 not-blocing

    BACKGROUND 代表执行订阅者订阅方法总是在子线程中。如果 post 事件所在的线程是子线程,则就在当前线程执行 订阅者的订阅方法(blocking); 如果调用post 事件所在的线程是主线程,会开启一个线程,执行订阅者 订阅方法(有用到线程池)(not-blocking)。

    通过 BACKGROUND 会尝试尽量在开启的线程中多处理几次发送的事件,虽然是通过线程池开启的线程,可能想一次尽可能的使用线程的资源。如果在此线程从 事件队列里取事件分发时,一直有事件塞进事件队列的话,则它就会一直循环处理事件的分发。

    ASYNC 也是表示订阅者的订阅方法会在子线程中处理。 和 BACKGROUND 不一样的是,无论如何它每次都会把事件包装在一个新线程中去执行(not-blocking)(这句话有瑕疵,因为是通过线程池控制的,所以运行时难免是线程池中缓存的线程)

    看一个最简单 Poster 实现吧

    class AsyncPoster implements Runnable, Poster {
    
        private final PendingPostQueue queue;
        private final EventBus eventBus;
    
        AsyncPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            queue.enqueue(pendingPost);
            eventBus.getExecutorService().execute(this);  // 线程池
        }
    
        @Override
        public void run() {
            PendingPost pendingPost = queue.poll();
            if(pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            }
            eventBus.invokeSubscriber(pendingPost);
        }
    
    }
    

    AsyncPoster 是每当有事件发送到消息队列中时,都会使用线程池开启一个子线程,去处理这段耗时的操作。

    保存事件的 事件队列 的设计还算简单。

    final class PendingPostQueue {
        private PendingPost head;
        private PendingPost tail;
    
       //入队列
      synchronized void enqueue(PendingPost pendingPost) {}
      //出队列
      synchronized PendingPost poll() {}
    }
    

    PendingPostQueue 是有链表组成的队列,保存了 headtail 引用方便入队、出队的操作。 此队列中的数据元素是 PendingPost 类型。封装了 event 实体 和 Subscription订阅者实体

    final class PendingPost {
        // 是一个静态的List,保存了回收的 `PendingPost`
        private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
        Object event;      // 事件
        Subscription subscription;  // 订阅者
    
        PendingPost next;    // 构成队列的关键
        ...
    }
    

    看了 PendingPost 就知道 它和 Android Handler机制中的 Message 设计几乎一样 都使用了一个容器缓存使用过的元素,可以节省元素的创建时间,针对这种频繁创建的对象,使用这种方式应该挺不错的。 (提升性能、避免内存移除,没错就是享元模式)

    至此,发送事件的过程干得事情也就分析完了。

    小结: 我们阐述了 订阅者注册自己,发送者发送事件,之后一步步通知 订阅者的大致流程。强烈建议大家自己看代码。

    四、APT提升效率

    订阅阶段是通过反射找到所有符合条件的订阅者方法,在上述第2小节 订阅 中有过描述,当时提了一下 APT的方式,APT是注解处理器的缩写,注解处理器作用于 编译阶段(javac)。即我们可以在编译阶段就知道所有的订阅者方法(@Subscribe修饰),并且可以在编译阶段对于不符合规定的写法提示错误,把错误留在编译期间。另一个好处是不需要到运行时通过反射获取,这样还可以提升程序运行效率。

    下面是一个编译期间提示用户方法格式错误的例子:
    @Subscribe 修饰的方法只能有一个参数。

    > Task :app:kaptDebugKotlin FAILED
    /Users/aaa/workspace/EventBusDemo/app/build/tmp/kapt3/stubs/debug/com/daddyno1/eventbusdemo/MainActivity.java:21: 错误: Subscriber method must have exactly 1 parameter
        public final void onHandlerMsg(@org.jetbrains.annotations.NotNull()
                          ^
    FAILURE: Build failed with an exception.
    
    * What went wrong:
    Execution failed for task ':app:kaptDebugKotlin'.
    > A failure occurred while executing org.jetbrains.kotlin.gradle.internal.KaptExecution
       > java.lang.reflect.InvocationTargetException (no error message)
    

    具体的APT使用不是本文重点,有兴趣的可以去自己试一试,相信你会收获不少。

    我们直接看一下APT生成的辅助文件。

    /** This class is generated by EventBus, do not edit. */
    public class MyEventBusIndex implements SubscriberInfoIndex {
        private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
    
        static {
            SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
            
            putIndex(new SimpleSubscriberInfo(com.daddyno1.eventbusdemo.MainActivity.class, true,
                    new SubscriberMethodInfo[] {
                new SubscriberMethodInfo("onHandlerMsg", com.daddyno1.eventbusdemo.MsgEvent.class, ThreadMode.POSTING, 2, false),
                new SubscriberMethodInfo("onHandlerMsg2", com.daddyno1.eventbusdemo.MsgEvent.class, ThreadMode.POSTING, 4,
                        false),
            }));
    
            putIndex(new SimpleSubscriberInfo(com.daddyno1.eventbusdemo.BaseActivity.class, true,
                    new SubscriberMethodInfo[] {
                new SubscriberMethodInfo("onHandlerMsg3", com.daddyno1.eventbusdemo.MsgEvent.class, ThreadMode.POSTING, 6,
                        false),
            }));
    
        }
    
        private static void putIndex(SubscriberInfo info) {
            SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
        }
    
        @Override
        public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
            SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
            if (info != null) {
                return info;
            } else {
                return null;
            }
        }
    }
    

    然后使用生成的 MyEventBusIndex 辅助文件:

    EventBus.builder().addIndex(MyEventBusIndex()).installDefaultEventBus()
    

    这时候在 register 订阅者的时候,就会 优先 使用 MyEventBusIndex 中的信息去遍历 订阅者的方法。

    最后,就到这吧。文章写得比较赶,可以结合源码走一遍。像EventBus这种规模的优秀库,特别适合深入学习一下,如果能再手写一个简陋的版本,那必然收获多多,对编码能力的提升也是很有好处的。

    我看介绍EventBus是一个 Android上的事件订阅分发的异步处理库。比较奇怪的一点是,多数情况下,我们是在Activity、Fragment上去订阅(当然任意类型都可以使用),其实可以可以对于订阅者的引用使用弱引用,当发音订阅者已经销毁,我们就可以把订阅表中的订阅者删除掉。这样一来可以既不用主动调用 unregister,也不用担心内存泄漏,岂不是更方便。

    相关文章

      网友评论

        本文标题:一篇讲明白EventBus

        本文链接:https://www.haomeiwen.com/subject/okhbrktx.html