EventBus详解及源码分析

作者: 仰简 | 来源:发表于2019-04-27 21:19 被阅读3次

    一、前言

    从 EventBus 的介绍中,EventBus 给的定位是:

    Event bus for Android and Java that simplifies communication between Activities, Fragments, Threads, Services, etc. Less code, better quality

    简单理解一下就是 Event bus 给 Android 及 Java (当然主要是 Android)的 Activity,Fragment,Threads,Services 之间提供一个简单的通信方式,从而能让我们用质量高且少的代码来完成我们的功能。

    二、EventBus 详解之官文解读

    2.1 架构概述

    EventBus 是一个基于发布/订阅的设计模式的事件总线,其架构图如下。

    EventBus 架构图

    从架构图上来看,还是很简单的,跟我们所最熟悉的观察者模式是类似的。大概工作过程就是:
    (1) Subscriber 也就是订阅者,订阅一个 Event,其中 Event 是自己定义的,符合 POJO 的规范即可。
    (2) 在需要时,Publisher 也就是发布者,就可将事件通过 EventBus 分发布相应的订阅者 Subscriber。

    恩,就是如此简单。当然,其实现远不会这么简单了,还是干了很多脏活和累活的。

    2.2 特性概述

    • 简化了组件之间的通信
      • 事件发送者和接收者分离
      • 在Activity,Fragment和后台线程中都可以很好的进行事件的分发
      • 避免复杂且容易出错的依赖关系和生命周期问题
    • 简化代码
    • 快,多快呢?
    • 小(约50k)
    • 已经通过100,000,000+安装的应用程序在实践中得到证实
    • 具有指定分发线程,优先级等高级功能。

    2.3 功能概述

    • 事件订阅 / 分发
      这是其最核心的功能,也是要在源码分析中需要深入的。
    • 指定分发线程 Delivery Threads (ThreadMode)
      即指定订阅者将在哪个线程中响应事件的处理。恩,这主要用于如网络或者耗时的线程处理,需要切换到主线程来更新 UI 的场景。当然,如果用 RxJava 的话,这个就再熟悉不过了。
    • 配置 Configuration
      通过 EventBusBuilder 类对 EventBus 进行各方面的配置,如配置可允许事件的分布没有订阅者处理,而不报异常。
    • 粘性事件 Sticky Events
      将用户发送过的事件暂时保存在内存中,当订阅者一旦订阅了该事件,就可以立即收到该事件。恩,这个就与粘性广播的概念是一致的了。
    • 优先级以及取消事件
      主要是指定 Subscriber 的优先级,其默认优先级为 0,指定了较高的优先级就可以优先收到事件进行处理。而收到事件的 Subcriber 中提前取消事件的继续分发,从而中断事件的继续分发。恩,这个也跟广播里的 abortBroadcast 概念一样。
    • Subscriber Index
      用中文不知道咋说。这个是 3.0 的版本加的,主要是将事件处理中的对事件的反射处理,提前到编译时来处理了。从而提高性能。当然,主要也是推荐在 Android 下使用。
    • 异步执行器 AsyncExecutor
      主要就是一个线程池,可以理解就是一个帮助类。

    三、源码解析

    3.1 开始向导

    EventBus 的官方文档为我们提供了经典的使用 EventBus 的 3 个过程。

    3.1.1定义事件

    普通的 POJO 即可

    public class MessageEvent {
     
        public final String message;
     
        public MessageEvent(String message) {
            this.message = message;
        }
    }
    

    3.1.2 准备订阅者

    文章是基于 EventBus 3 进行分析,对于订阅者的方法命名可以自由命名了,而不需要采用约定命名。

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onMessageEvent(MessageEvent event) {
        Toast.makeText(getActivity(), event.message, Toast.LENGTH_SHORT).show();
    }
     
    // This method will be called when a SomeOtherEvent is posted
    @Subscribe
    public void handleSomethingElse(SomeOtherEvent event) {
        doSomethingWith(event);
    }
    

    准备好了订阅者之后,我们还需要向总线注册这个订阅者,这个我们的订阅者方法才能接收到相应的事件。

    @Override
    public void onStart() {
        super.onStart();
        EventBus.getDefault().register(this);
    }
     
    @Override
    public void onStop() {
        EventBus.getDefault().unregister(this);
        super.onStop();
    }
    

    3.1.3 发送事件

    我们可以在任何地方发送事件。

    EventBus.getDefault().post(new MessageEvent("Hello everyone!"));
    

    3.2 源码解析

    根据前文的 3 步经典使用方法,源码的分析大概也分成如下 4 步来进行。

    3.2.1 注册订阅者

    EventBus Register.jpg

    如上面的时序图,事件的订阅大概可以分成 6 个步骤,下面来一一分析。

    方法getDefault()

    public static EventBus getDefault() {
            if (defaultInstance == null) {
                synchronized (EventBus.class) {
                    if (defaultInstance == null) {
                        defaultInstance = new EventBus();
                    }
                }
            }
            return defaultInstance;
        }
    

    getDefault() 方法就是一个“懒汉”的单例模式,目的就是让我们能在全局对 EventBus 的引用进行使用。值得关注的是,这个懒汉的单例模式实现并不会有多线程的安全问题。因为对于 defaultInstance 的定义是 volatile 的。

    static volatile EventBus defaultInstance;
    

    接下来继续看看构造方法。

    构造方法EventBus()

    public EventBus() {
            this(DEFAULT_BUILDER);
        }
    
        EventBus(EventBusBuilder builder) {
            logger = builder.getLogger();
            subscriptionsByEventType = new HashMap<>();
            typesBySubscriber = new HashMap<>();
            stickyEvents = new ConcurrentHashMap<>();
            mainThreadSupport = builder.getMainThreadSupport();
            mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
            backgroundPoster = new BackgroundPoster(this);
            asyncPoster = new AsyncPoster(this);
            indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
            subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                    builder.strictMethodVerification, builder.ignoreGeneratedIndex);
            logSubscriberExceptions = builder.logSubscriberExceptions;
            logNoSubscriberMessages = builder.logNoSubscriberMessages;
            sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
            sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
            throwSubscriberException = builder.throwSubscriberException;
            eventInheritance = builder.eventInheritance;
            executorService = builder.executorService;
        }
    

    构造方法使用了常见的 Builder 模式对 EventBus 中的各个属性进行了初始化。这里使用的是默认配置。一般情况下,我们也都是使用 getDefault() 及其默认配置来获取实例的。我们也可以通过配置 EventBusBuilder 来 build 出一个自己的实例,但是要注意的是,后面的注册、注销以及发送事件都要基于此实例来进行,否则就会发生事件错乱发错的问题。

    方法register()
    如果不看方法的实现,根据经验判断,我想当 register 的发生,应该是将订阅者中用于接收事件的方法与该事件关联起来。那是不是这样呢?

    public void register(Object subscriber) {
            Class<?> subscriberClass = subscriber.getClass();
            // 1.通过订阅者类找到其订阅方法
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
               // 2. 然后进行逐个订阅
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
    

    首行来看看找订阅方法。这里封装了一个专门的类 SubscriberMethodFinder,并且通过其方法 findSubscriberMethods() 来进行查找,这里就不贴 findSubscriberMethods() 的代码,而是看看其内部实际用于寻找订阅方法的 findUsingReflection()。

    private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
               ......
                methods = findState.clazz.getDeclaredMethods();
            } catch (Throwable th) {
                ......
            }
            for (Method method : methods) {
                int modifiers = method.getModifiers();
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length == 1) {
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        if (subscribeAnnotation != null) {
                            Class<?> eventType = parameterTypes[0];
                            if (findState.checkAdd(method, eventType)) {
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        ......
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    ......
                }
            }
        }
    

    总结一下,该方法主要就是寻找订阅者类中的公有方法,且其参数唯一的以及含有注解声明@Subscribe 的方法。就是我们在写代码时所定义的订阅者方法了。找到订阅者方法后,会将其 method(方法的反射类 Method) 、event、thread mode 以及优先级等封装成一个 SubscribeMethod 然后添加到 FindState 中。

    这个 FindState 是 SubscriberMethodFinder 的一个内部类。其用了大小只有 4 的 FIND_STATE_POOL 来进行管理,这样避免了 FindState 对象的重复创建。

    最后在 find 中会将所找到订阅者方法添加到 “METHOD_CACHE” 中,这是一个 ConcurrentHashMap 的结构。

    Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
    

    方法subscribe()

    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType;
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                ......
                }
            }
    
            // 按优先级进行插入
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
    
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            subscribedEvents.add(eventType);
    
            // 粘性事件,注册就会发送
            if (subscriberMethod.sticky) {
                    .....
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
    

    subscribe() 方法完成事件与 subscribe 、 subscribeMethod 的关联,具体怎么关联的呢,请看图。


    subscriptionsByEventType.jpg typesBySubscriber.jpg

    这里分别用了 2 个 HashMap 来描述。subscriptionsByEventType 记录了一个事件应该调用哪个订阅者的订阅方法来处理。而 typesBySubscriber 记录了订阅者订阅了哪些事件。

    至此,订阅者的注册完成之后,也就相当于是 EventBus 的初始化完成了,那么接下来就可以愉快发送事件了。

    3.2.2 事件发送

    EventBus post.jpg

    事件的发送看起来好像也比较简单。

    public void post(Object event) {
            // 获取发送者线程的状态管理器
            PostingThreadState postingState = currentPostingThreadState.get();
            // 从状态管理器中取出事件队列,并将事件添加到队列中
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);
            // 如果当前不是发送状态,那就进入发送状态
            if (!postingState.isPosting) {
                postingState.isMainThread = isMainThread();
                postingState.isPosting = true;
                if (postingState.canceled) {
                   ......
                }
                try {
                   // 在发送状态下,通过遍历事件队列逐个发送事件
                    while (!eventQueue.isEmpty()) {
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    

    post() 方法的主要过程都写在代码注释里了。这里主要关注下 currentPostingThreadState,它的定义如下:

    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
            @Override
            protected PostingThreadState initialValue() {
                return new PostingThreadState();
            }
        };
    

    而 PostingThreadState 的定义如下。

    final static class PostingThreadState {
            final List<Object> eventQueue = new ArrayList<>();
            boolean isPosting;
            boolean isMainThread;
            Subscription subscription;
            Object event;
            boolean canceled;
        }
    

    这里用了 ThreadLocal 来保存 PostingThreadState。ThreadLocal 的主要作用是使得所定义的资源是线程私有的。那么,也就是说,对于每一个发送事件的线程其都有一个唯一的 PostingThreadState 来记录事件发送的队列以及状态。

    下图是 ThreadLocal 在 Thread 中的数据结构描述,而这里的 PostingThreadState 就是下图中的 Value。


    ThreadLocal.jpg
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            if (eventInheritance) {
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            ......
    

    postSingleEvent 所做的事情主要就是根据要发送的事情收集事件,然后逐个发送。收集的什么事情呢,很简单,就是收集它的父类事件。也就是说,当我们发送一个事件时,它的所有父类事件也同时会被发送,这里的父类还包括了 Object 在内。所以,这里需要开发者们非常注意了:

    事件是具有向上传递性质的

    接下来的 postSingleEventForEventType 就是从 subscriptionsByEventType 找出事件所订阅的订阅者以及订阅者方法,即 CopyOnWriteArrayList<Subscription>。
    然后再通过方法 postToSubscription() 将事件逐个逐个向订阅者进行发送。在 postToSubscription() 方法中会根据不同的 ThreadMode 来决定不同的线程调度策略,具体在下面讲。而不管采用何种策略,最终都将会通过invokeSubscriber() 进行方法的反射调用来进行订阅方法的调用。

    void invokeSubscriber(Subscription subscription, Object event) {
           ......
           subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
           ......
        }
    
    

    至此,事件的发送就完成了。很简单,其真实的面目就是一个方法的反射调用。而对于事件的发送,这里还应该关注一下线程的调度策略。

    3.2.3 线程调度策略。

    上面说到线程的调度策略在 postToSubscription() 方法中,那么就来看一看这个方法。

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    针对每个不同的策略,下面来简单看一看。
    POSTING
    发送者线程直接发送,也是 ThreadMode 的默认值。
    MAIN
    如果发送者是主线程,则直接发送。如果不是则交给 mainThreadPoster 来发送。mainThreadPoster 是 HandlerPoster 类型,该类继承了 Handler 并实现了 Poster 接口,其实例是由 AndroidHandlerMainThreadSupport 所创建的。AndroidHandlerMainThreadSupport 包含了主线程的 Looper ,而 HandlerPoster 包含了一个链表结构的队列 PendingPostQueue,事件也将插入到 PendingPostQueue 中。每一个事件都会以一个空 Message 发送到主线程的消息队列,消息处理完再取出下一个事件,同样以 Message 的形式来执行,如此反复执行。直到队列为空。
    MAIN_ORDERED
    一般来说也是通过 mainThreadPoster 来发送,异常情况下才通过发送者线程来发送。从代码侧来看,它主要就是属于 MAIN 策略的第二种情况。
    BACKGROUND
    如果当前为主线程,则将其投递到 backgroundPoster,否则直接发送。这里的 backgroundPoster 就是一个子线程。
    ASYNC
    异步模式下,最终就是通过 ExecutorService ,也即线程池来发送的。asyncPoster 即 AsyncPoster,其关于线程调度的关键代码为:

    eventBus.getExecutorService().execute(this);
    

    3.2.4 注销订阅者

    EventBus unregister.jpg

    注销就比较简单了,如下代码,主要就是从 typesBySubscriber 这个 map 中将订阅者移除掉,并且解决订阅者和事件的关联。

    public synchronized void unregister(Object subscriber) {
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                for (Class<?> eventType : subscribedTypes) {
                    unsubscribeByEventType(subscriber, eventType);
                }
                typesBySubscriber.remove(subscriber);
            } else {
                logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
        }
    

    四、总结

    从整个分析过程来看,EventBus 是一个比较简单的框架,其涉及到的核心知识是 ThreadLocal 以及方法的反射调用,而基于此为我们封装了一套完整的事件 分发-传递-响应 机制,当然也是一个非常优秀的框架。总结一下,其主要的特点:

    1. 事件的定义是由用户自己来定义的,只要是 POJO 就可以。且需要注意的是,事件具有向上传递的特性,即发送一个事件会连同它的父类事件一起发送。
    2. 关于 ThreadMode,默认则发送者线程发送,并且是同步的。对于主线程则通过主线程的 MainLooper 来实现,异步事件则由线程池来实现。
    3. EventBus 的实例可以通过 getDefault() 的单例来获取,也可以通过 EventBusBuilder 的 builder() 方法来获取,但要注意它们不是同一个 EventBus 的实例,也不能互相发送事件。

    最后,感谢你能读到并读完此文章,如果分析的过程中存在错误或者疑问都欢迎留言讨论。如果我的分享能够帮助到你,还请记得帮忙点个赞吧,谢谢。

    相关文章

      网友评论

        本文标题:EventBus详解及源码分析

        本文链接:https://www.haomeiwen.com/subject/ahvfgqtx.html