美文网首页
EventBus源码分析

EventBus源码分析

作者: BrotherTree | 来源:发表于2018-12-05 00:38 被阅读0次

    EventBus是在Android中使用到的发布-订阅事件总线框架,基于观察者模式,将事件的发送者和接收者解耦,简化了组件之间的通信。

    源码clone地址

    https://github.com/greenrobot/EventBus.git
    

    我们从以下几个方面着手分析EventBus源码流程:

    • 角色解释
    • 构造函数
    • Subscribe注解
    • 注册事件订阅方法
    • 取消注册
    • 发送事件
    • 粘性事件
    角色解释
    image.png
    Event:事件,又叫消息,其实就是一个对象,可以是网络请求返回的字符串,也可以是某个开关状态等等。事件类型EventType是指事件所属的Class。
    事件分为一般事件和Sticky事件,相对于一般事件,Sticky事件不同之处在于,当事件发布后,再有订阅者开始订阅该类型事件,依然能收到该类型事件的最近一个Sticky事件。

    Subscriber:事件订阅者。在EventBus 3.0以后,事件处理的方法可以随便取名,但是需要添加一个注解@Subscribe,并且要指定线程模式(默认为POSTING)

    Publisher:事件发布者。可以在任意线程任意位置,通过post(MessageEvent)方法发送事件。可以自己实例化EventBus对象,但一般使用EventBus.getDefault()就可以。

    构造函数

    我们看看new EventBus在做什么

    public EventBus() {
            this(DEFAULT_BUILDER);
        }
    

    这里DEFAULT_BUILDER是默认的EventBusBuilder,用来构造EventBus

    private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
    

    this调用了EventBus另一个构造函数完成它相关属性的初始化

    EventBus(EventBusBuilder builder) {
            logger = builder.getLogger();
            // subscriptionsByEventType是一个HashMap,保存了以eventType为key,Subscription对象集合为value的键值对
            subscriptionsByEventType = new HashMap<>();
            // typesBySubscribere也是一个HashMap,保存了以当前要注册类的对象为key,注册类中订阅事件的方法的参数类型的集合为value的键值对
            typesBySubscriber = new HashMap<>();
            // stickyEvents就是发送粘性事件时,保存了事件类型和对应事件的集合
            stickyEvents = new ConcurrentHashMap<>();
            // 当前线程为主线程
            mainThreadSupport = builder.getMainThreadSupport();
            // 主线程
            mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
            // 后台线程
            backgroundPoster = new BackgroundPoster(this);
            // 异步线程
            asyncPoster = new AsyncPoster(this);
            // 如支持注解生成器,引入索引,则记录的是索引数
            indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
            // 订阅方法查找器
            subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,builder.strictMethodVerification, builder.ignoreGeneratedIndex);
            logSubscriberExceptions = builder.logSubscriberExceptions;
            logNoSubscriberMessages = builder.logNoSubscriberMessages;
            sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
            sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
            throwSubscriberException = builder.throwSubscriberException;
            eventInheritance = builder.eventInheritance;
            executorService = builder.executorService;
    }
    

    所以我们也可以通过如下方式进行事件注册

    EventBus.builder().eventInheritance(true).build().register(this);
    

    通常我们使用更简便的方式进行注册

    EventBus.getDefault().register(this);
    

    看一下getDefault()方法

    static volatile EventBus defaultInstance;
    ......
    public static EventBus getDefault() {
            EventBus instance = defaultInstance;
            if (instance == null) {
                synchronized (EventBus.class) {
                    instance = EventBus.defaultInstance;
                    if (instance == null) {
                        instance = EventBus.defaultInstance = new EventBus();
                    }
                }
            }
            return instance;
        }
    

    可以看出getDefault()是一个单例方法,而且使用了DCL双重检查模式

    DCL优点是资源利用率高,第一次执行getInstance时单例对象才被实例化,效率高。缺点是第一次加载时反应稍慢一些,在高并发环境下也有一定的缺陷,虽然发生的概率很小。DCL虽然在一定程度解决了资源的消耗和多余的同步,线程安全等问题,但是他还是在某些情况会出现失效的问题,也就是DCL失效,在《java并发编程实践》一书建议用静态内部类单例模式来替代DCL。

    附:静态内部类单例模式

    public class EventBus {
        private EventBus(){
        }
        public static EventBus getInstance(){
            return EventBusHolder.sInstance;
        }
        private static class EventBusHolder {
            private static final EventBus sInstance = new EventBus();
        }
    }
    

    第一次加载EventBus类时并不会初始化sInstance,只有第一次调用getInstance方法时虚拟机加载EventBusHolder 并初始化sInstance ,这样不仅能确保线程安全也能保证EventBus类的唯一性,所以推荐使用静态内部类单例模式。

    Subscribe注解
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD})
    public @interface Subscribe {
        // 指定事件订阅方法的线程模式,即在那个线程执行事件订阅方法处理事件,默认为POSTING
        ThreadMode threadMode() default ThreadMode.POSTING;
        // 是否支持粘性事件,默认为false
        boolean sticky() default false;
        // 指定事件订阅方法的优先级,默认为0,如果多个事件订阅方法可以接收相同事件的,则优先级高的先接收到事件
        int priority() default 0;
    }
    

    @Retention按生命周期来划分可分为3类

    • RetentionPolicy.SOURCE:注解只保留在源文件,当Java文件编译成class文件的时候,注解被遗弃;
    • RetentionPolicy.CLASS:注解被保留到class文件,但jvm加载class文件时候被遗弃,这是默认的生命周期;
    • RetentionPolicy.RUNTIME:注解不仅被保存到class文件中,jvm加载class文件之后,仍然存在;
      首先要明确生命周期长度 SOURCE < CLASS < RUNTIME ,所以前者能作用的地方后者一定也能作用。一般如果需要在运行时去动态获取注解信息,那只能用 RUNTIME 注解;如果要在编译时进行一些预处理操作,比如生成一些辅助代码(如 ButterKnife),就用 CLASS注解;如果只是做一些检查性的操作,比如 @Override 和 @SuppressWarnings,则可选用 SOURCE 注解

    @Target说明了Annotation所修饰的对象范围,具体包括:

    • CONSTRUCTOR:用于描述构造器
    • FIELD:用于描述域
    • LOCAL_VARIABLE:用于描述局部变量
    • METHOD:用于描述方法
    • PACKAGE:用于描述包
    • PARAMETER:用于描述参数
    • TYPE:用于描述类、接口(包括注解类型) 或enum声明

    在使用Subscribe注解时可以根据需求指定threadMode、sticky、priority三个属性。
    其中threadMode属性有如下几个可选值:

    POSTING
    如果使用事件处理函数指定了线程模型为POSTING,那么该事件在哪个线程发布出来的,事件处理函数就会在这个线程中运行,也就是说发布事件和接收事件在同一个线程。在线程模型为POSTING的事件处理函数中尽量避免执行耗时操作,因为它会阻塞事件的传递,甚至有可能会引起ANR。

    MAIN
    如在主线程(UI线程)发送事件,则直接在主线程处理事件;
    如果在子线程发送事件,则先将事件入队列,然后通过 Handler 切换到主线程,依次处理事件

    MAIN_ORDERED
    无论在哪个线程发送事件,都先将事件入队列,然后通过 Handler 切换到主线程,依次处理事件

    BACKGROUND
    如果在主线程发送事件,则先将事件入队列,然后通过线程池依次处理事件;
    如果在子线程发送事件,则直接在发送事件的线程处理事件;
    在此事件处理函数中禁止进行UI更新操作

    ASYNC
    无论事件在哪个线程发布,该事件处理函数都会在新建的子线程中执行(将事件入队列,然后通过线程池处理),同样,此事件处理函数中禁止进行UI更新操作

    注册事件订阅方法

    注册事件方法如下:

    EventBus.getDefault().register(this);
    

    EventBus.java类中的register方法:

    public void register(Object subscriber) {
            // 得到当前要注册类的Class对象
            Class<?> subscriberClass = subscriber.getClass();
            // 根据Class查找当前类中订阅了事件的方法集合,即使用了Subscribe注解、有public修饰符、一个参数的方法
            // SubscriberMethod类主要封装了符合条件方法的相关信息:
            // Method对象、线程模式、事件类型、优先级、是否是粘性事等
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
                // 循环遍历订阅了事件的方法集合,以完成注册
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
    

    可以发现,register方法主要做了两件事情:

    • 查找当前类中订阅了事件的方法集合
    • 循环遍历方法集合,完成注册

    看下findSubscriberMethods方法做了什么

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
            // METHOD_CACHE是一个ConcurrentHashMap,直接保存了subscriberClass和对应SubscriberMethod的集合,以提高注册效率,赋值重复查找。
            // 从缓存中获取SubscriberMethod集合
            List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
            if (subscriberMethods != null) {
                return subscriberMethods;
            }
            // 由于使用了默认的EventBusBuilder,则ignoreGeneratedIndex属性默认为false,即是否忽略注解生成器
            if (ignoreGeneratedIndex) {
                subscriberMethods = findUsingReflection(subscriberClass);
            } else {
                subscriberMethods = findUsingInfo(subscriberClass);
            }
            //在获得subscriberMethods以后,如果订阅者中不存在@Subscribe注解并且为public的订阅方法,则会抛出异常。
            if (subscriberMethods.isEmpty()) {
                throw new EventBusException("Subscriber " + subscriberClass
                        + " and its super classes have no public methods with the @Subscribe annotation");
            } else {
                // 保存查找到的订阅事件的方法
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
                return subscriberMethods;
            }
        }
    

    首先从缓存中查找,如果找到了就立马返回。如果缓存中没有的话,则根据 ignoreGeneratedIndex 选择如何查找订阅方法,ignoreGeneratedIndex表示是否忽略注解器生成MyEventBusIndex。
    最后,找到订阅方法后,放入缓存,以免下次继续查找。
    ignoreGeneratedIndex 默认就是false,可以通过EventBusBuilder来设置它的值。
    在项目中经常通过EventBus单例模式来获取默认的EventBus对象,也就是ignoreGeneratedIndex为false的情况,这种情况调用了findUsingInfo方法。

    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
            FindState findState = prepareFindState();
            findState.initForSubscriber(subscriberClass);
            // 初始状态下findState.clazz就是subscriberClass
            while (findState.clazz != null) {
                // getSubscriberInfo方法获取订阅者信息,主要执行两个判断
                // 当前类没有继承父类的订阅方法,则去判断是否配置了索引,如没有则null
                findState.subscriberInfo = getSubscriberInfo(findState);
                if (findState.subscriberInfo != null) {
                    SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                    for (SubscriberMethod subscriberMethod : array) {
                        // checkAdd()方法用来判断FindState的anyMethodByEventType map是否已经添加过以当前eventType为key的键值对,没添加过则返回true
                        if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                            findState.subscriberMethods.add(subscriberMethod);
                        }
                    }
                } else {
                    //通过反射来查找订阅方法
                    findUsingReflectionInSingleClass(findState);
                }
                // 修改findState.clazz为subscriberClass的父类Class,即向上遍历父类
                findState.moveToSuperclass();
            }
            // 查找到的方法保存在了FindState实例的subscriberMethods集合中。
            // 使用subscriberMethods构建一个新的List<SubscriberMethod>
            // 释放掉findState
            return getMethodsAndRelease(findState);
        }
    

    通过getSubscriberInfo方法来获取订阅者信息。
    如果当前类集成了父类的订阅方法,则返回当前类的订阅方法。否则往下继续走。
    在我们开始查找订阅方法的时候并没有忽略注解器为我们生成的索引MyEventBusIndex。
    如果我们通过EventBusBuilder配置了MyEventBusIndex,便会获取到subscriberInfo,调用subscriberInfo的getSubscriberMethods方法便可以得到订阅方法相关的信息,这个时候就不在需要通过注解进行获取订阅方法。
    上面条件判断都false的情况下,getSubscriberInfo返回null,接下来便会执行findUsingReflectionInSingleClass方法,将订阅方法保存到findState中。
    最后再通过getMethodsAndRelease方法对findState做回收处理并反回订阅方法的List集合。
    这里出现了一个FindState类,它是SubscriberMethodFinder的内部类,用来辅助查找订阅事件的方法。

    FindState
    为什么要使用FindState呢?首先是面向对象封装的采用,那么看看它给我们提供了哪些方法?

    // 用来初始化传入订阅类
    void initForSubscriber(Class<?> subscriberClass) {
        ......
    }
    // 检查方法信息,保证获取的订阅方法都是合法
    boolean checkAdd(Method method, Class<?> eventType) {
        ......
    }
    // 检查方法信息,保证获取的订阅方法都是合法
    private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
        ......
    }
    // 查看父类中的订阅方法
    void moveToSuperclass() {
        ......
    }
    

    缓存的使用
    使用java的,应该要知道频繁地创建对象,是非常消耗资源的,在jvm垃圾回收时候,会出现内存抖动的问题。所以,我们在这里,一定要注意缓存的使用。

    上文中提到的中间器FindState,就采用了缓存:

    private static final int POOL_SIZE = 4;
    private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
    

    指定了FindState的缓存大小为4,并使用一维的静态数组,所以这里需要注意线程同步的问题:

    private FindState prepareFindState() {
            synchronized (FIND_STATE_POOL) {
                for (int i = 0; i < POOL_SIZE; i++) {
                    FindState state = FIND_STATE_POOL[i];
                    if (state != null) {
                        FIND_STATE_POOL[i] = null;
                        return state;
                    }
                }
            }
            return new FindState();
        }
    

    这段是用来获取FindState, 可以看到的是对这段缓存的获取使用了synchronized关键字,来将缓存中FindState的获取,变为同步块。
    而在subscriberMethod的获取的同时,则对FindState的缓存做了添加的操作,同样是也必须是同步代码块:

    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
            List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
            findState.recycle();
            synchronized (FIND_STATE_POOL) {
                for (int i = 0; i < POOL_SIZE; i++) {
                    if (FIND_STATE_POOL[i] == null) {
                        FIND_STATE_POOL[i] = findState;
                        break;
                    }
                }
            }
            return subscriberMethods;
        }
    

    另外,EventBus也对subsciberMethod的获取,也做了缓存的操作,这样进行SubscriberMethod查找的时候,则优先进行缓存的查找:

        private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
    

    这里,使用的是数据结构是ConcurrentHashMap,就可以不必写大量的同步代码块了

    HashMap,Hashtable与ConcurrentHashMap都是实现的哈希表数据结构,在随机读取的时候效率很高。
    Hashtable实现同步是利用synchronized关键字进行锁定的,其是针对整张哈希表进行锁定的,即每次锁住整张表让线程独占,在线程安全的背后是巨大的浪费。
    ConcurrentHashMap和Hashtable主要区别就是围绕着锁的粒度进行区别以及如何区锁定。
    ConcurrentHashMap的实现方式,单独锁住每一个桶(segment).ConcurrentHashMap将哈希表分为16个桶(默认值),诸如get(),put(),remove()等常用操作只锁当前需要用到的桶,而size()才锁定整张表。
    原来只能一个线程进入,现在却能同时接受16个写线程并发进入。
    所以,才有了并发性的极大提升

    具体的查找过程在findUsingReflectionInSingleClass()方法,它主要通过反射查找订阅事件的方法

    private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
                // This is faster than getMethods, especially when subscribers are fat classes like Activities
                methods = findState.clazz.getDeclaredMethods();
            } catch (Throwable th) {
                // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
                methods = findState.clazz.getMethods();
                findState.skipSuperClasses = true;
            }
            // 循环遍历当前类的方法,筛选出符合条件的
            for (Method method : methods) {
                // 获得方法的修饰符
                int modifiers = method.getModifiers();
                // 如果是public类型,但非abstract、static等
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    // 获得当前方法所有参数的类型
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    // 如果当前方法只有一个参数
                    if (parameterTypes.length == 1) {
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        // 如果当前方法使用了Subscribe注解
                        if (subscribeAnnotation != null) {
                            // 得到该参数的类型
                            Class<?> eventType = parameterTypes[0];
                            // checkAdd()方法用来判断FindState的anyMethodByEventType map是否已经添加过以当前eventType为key的键值对,没添加过则返回true
                            if (findState.checkAdd(method, eventType)) {
                                // 得到Subscribe注解的threadMode属性值,即线程模式
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                // 创建一个SubscriberMethod对象,并添加到subscriberMethods集合
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
        }
    

    至此,我们将register方法中的findSubscriberMethods流程分析完毕,并获取到了当前类中订阅了事件的方法集合。
    我们再看一下register方法内容:

    public void register(Object subscriber) {
            // 得到当前要注册类的Class对象
            Class<?> subscriberClass = subscriber.getClass();
            // 根据Class查找当前类中订阅了事件的方法集合,即使用了Subscribe注解、有public修饰符、一个参数的方法
            // SubscriberMethod类主要封装了符合条件方法的相关信息:
            // Method对象、线程模式、事件类型、优先级、是否是粘性事等
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
                // 循环遍历订阅了事件的方法集合,以完成注册
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
    

    订阅者的订阅过程执行subscribe方法

    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            // 得到当前订阅了事件的方法的参数类型
            Class<?> eventType = subscriberMethod.eventType;
            // 根据订阅者和订阅方法构造一个订阅事件
            // Subscription类保存了要注册的类对象以及当前的subscriberMethod订阅方法
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            // 获取当前订阅事件中Subscription的List集合
            // subscriptionsByEventType是一个HashMap,保存了以eventType为key,Subscription对象集合为value的键值对
            // 先查找subscriptionsByEventType是否存在以当前eventType为key的值
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            // 如果不存在,则创建一个subscriptions,并保存到subscriptionsByEventType
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                // 订阅者已经注册则抛出EventBusException
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
    
            int size = subscriptions.size();
            // 遍历订阅事件,找到优先级比subscriptions中订阅事件小的位置,然后插进去
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
    
            // typesBySubscribere也是一个HashMap,保存了以当前要注册类的对象为key,注册类中订阅事件的方法的参数类型的集合为value的键值对
            // 通过订阅者获取该订阅者所订阅事件的集合
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                // 不存在则创建一个subscribedEvents,并保存到typesBySubscriber
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            // 将当前的订阅事件添加到subscribedEvents中
            subscribedEvents.add(eventType);
    
            // 粘性事件处理
            if (subscriberMethod.sticky) {
                if (eventInheritance) {
                    // Existing sticky events of all subclasses of eventType have to be considered.
                    // Note: Iterating over all events may be inefficient with lots of sticky events,
                    // thus data structure should be changed to allow a more efficient lookup
                    // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            Object stickyEvent = entry.getValue();
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    Object stickyEvent = stickyEvents.get(eventType);
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
    

    订阅的代码主要就做了两件事
    第一件事是将我们的订阅方法和订阅者封装到subscriptionsByEventType和typesBySubscriber中,subscriptionsByEventType是我们投递订阅事件的时候,就是根据我们的EventType找到我们的订阅事件,从而去分发事件,处理事件的;typesBySubscriber在调用unregister(this)的时候,根据订阅者找到EventType,又根据EventType找到订阅事件,从而对订阅者进行解绑。
    第二件事,如果是粘性事件的话,就立马投递、执行。

    取消注册

    EventBus取消注册,在生命周期onDestroy方法中执行

    EventBus.getDefault().unregister(this);
    

    看一下unregister方法

    public synchronized void unregister(Object subscriber) {
            // 得到当前注册类对象 对应的 订阅事件方法的参数类型 的集合
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                // 遍历参数类型集合,释放之前缓存的当前类中的Subscription
                for (Class<?> eventType : subscribedTypes) {
                    unsubscribeByEventType(subscriber, eventType);
                }
                // 删除以subscriber为key的键值对
                typesBySubscriber.remove(subscriber);
            } else {
                logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
        }
    

    继续看unsubscribeByEventType方法

    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
            // 得到当前参数类型对应的Subscription集合
            List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions != null) {
                int size = subscriptions.size();
                // 遍历Subscription集合
                for (int i = 0; i < size; i++) {
                    Subscription subscription = subscriptions.get(i);
                    // 如果当前subscription对象对应的注册类对象 和 要取消注册的注册类对象相同,则删除当前subscription对象
                    if (subscription.subscriber == subscriber) {
                        subscription.active = false;
                        subscriptions.remove(i);
                        i--;
                        size--;
                    }
                }
            }
        }
    

    取消注册流程很简单,typesBySubscriber我们在订阅者注册的过程中讲到过这个属性,他根据订阅者找到EventType,然后根据EventType和订阅者来得到订阅事件来对订阅者进行解绑。
    所以在unregister()方法中,释放了typesBySubscriber、subscriptionsByEventType中缓存的资源

    发送事件

    当发送一个事件的时候,我们可以通过如下方式:

    EventBus.getDefault().post("Hello World!")
    

    可以看到,发送事件就是通过post()方法完成的:

    public void post(Object event) {
            // currentPostingThreadState是一个PostingThreadState类型的ThreadLocal
            // PostingThreadState保存着事件队列和线程状态信息
            PostingThreadState postingState = currentPostingThreadState.get();
            // 获取事件队列,并将当前要发送的事件插入到事件队列中
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);
            // isPosting默认为false
            if (!postingState.isPosting) {
                // 是否为主线程
                postingState.isMainThread = isMainThread();
                // isPosting设置为true
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
                    // 遍历事件队列
                    while (!eventQueue.isEmpty()) {
                        // 发送单个事件
                        // eventQueue.remove(0),从事件队列移除事件
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    

    首先从PostingThreadState对象中取出事件队列,然后再将当前的事件插入到事件队列当中。最后将队列中的事件依次交由postSingleEvent方法进行处理,并移除该事件。

    来看看postSingleEvent方法里做了什么:

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            // eventInheritance默认为true,表示是否向上查找事件的父类
            if (eventInheritance) {
                // 查找当前事件类型的Class,连同当前事件类型的Class保存到集合
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                // 遍历Class集合,继续处理事件
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            // 找不到该事件时的异常处理
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
    

    eventInheritance表示是否向上查找事件的父类,它的默认值为true,可以通过在EventBusBuilder中来进行配置。
    当eventInheritance为true时,则通过lookupAllEventTypes找到所有的父类事件并存在List中,然后通过postSingleEventForEventType方法对事件逐一处理。
    如果eventInheritance为false,则直接交由postSingleEventForEventType方法对事件进行处理

    看下postSingleEventForEventType方法是怎么处理的:

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
            CopyOnWriteArrayList<Subscription> subscriptions;
            // 取出该事件对应的Subscription集合
            synchronized (this) {
                subscriptions = subscriptionsByEventType.get(eventClass);
            }
            // 如果已订阅了对应类型的事件
            if (subscriptions != null && !subscriptions.isEmpty()) {
                for (Subscription subscription : subscriptions) {
                    // 记录事件
                    postingState.event = event;
                    // 记录对应的subscription
                    postingState.subscription = subscription;
                    boolean aborted = false;
                    try {
                        // 最终的事件处理
                        postToSubscription(subscription, event, postingState.isMainThread);
                        aborted = postingState.canceled;
                    } finally {
                        postingState.event = null;
                        postingState.subscription = null;
                        postingState.canceled = false;
                    }
                    if (aborted) {
                        break;
                    }
                }
                return true;
            }
            return false;
        }
    

    同步取出该事件对应的Subscription集合并遍历该集合将事件event和对应Subscription传递给postingState并调用postToSubscription方法对事件进行处理。

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            // 判断订阅事件方法的线程模式
            switch (subscription.subscriberMethod.threadMode) {
                // 默认的线程模式,在哪个线程发送事件就在哪个线程处理事件
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                // 在主线程处理事件
                case MAIN:
                    // 如果在主线程发送事件,则直接在主线程通过反射处理事件
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        // 如果是在子线程发送事件,则将事件入队列,通过Handler切换到主线程执行处理事件
                        // mainThreadPoster 不为空
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                // 无论在那个线程发送事件,都先将事件入队列,然后通过 Handler 切换到主线程,依次处理事件。
                // mainThreadPoster 不为空
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    // 如果在主线程发送事件,则先将事件入队列,然后通过线程池依次处理事件
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        // 如果在子线程发送事件,则直接在发送事件的线程通过反射处理事件
                        invokeSubscriber(subscription, event);
                    }
                    break;
                // 无论在那个线程发送事件,都将事件入队列,然后通过线程池处理。
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    可以看到,postToSubscription()方法就是根据订阅事件方法的线程模式、以及发送事件的线程来判断如何处理事件。
    处理方式主要有两种:
    第一种是在相应线程直接通过invokeSubscriber()方法,用反射来执行订阅事件的方法,这样发送出去的事件就被订阅者接收并做相应处理了
    第二种是先将事件入队列(其实底层是一个List),然后做进一步处理,我们以mainThreadPoster.enqueue(subscription, event)为例简单的分析下,其中mainThreadPoster是HandlerPoster类的一个实例

    public class HandlerPoster extends Handler implements Poster {
        private final PendingPostQueue queue;
        private boolean handlerActive;
        ......
        public void enqueue(Subscription subscription, Object event) {
            // 用subscription和event封装一个PendingPost对象
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                // 入队列
                queue.enqueue(pendingPost);
                if (!handlerActive) {
                    handlerActive = true;
                    // 发送开始处理事件的消息,handleMessage()方法将被执行,完成从子线程到主线程的切换
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
    
        @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                // 死循环遍历队列
                while (true) {
                    // 出队列
                    PendingPost pendingPost = queue.poll();
                    ......
                    // 进一步处理pendingPost
                    eventBus.invokeSubscriber(pendingPost);
                    ......
                }
            } finally {
                handlerActive = rescheduled;
            }
        }
    }
    

    所以HandlerPoster的enqueue()方法主要就是将subscription、event对象封装成一个PendingPost对象,然后保存到队列里,之后通过Handler切换到主线程,在handleMessage()方法将中将PendingPost对象循环出队列,交给invokeSubscriber()方法进一步处理

    void invokeSubscriber(PendingPost pendingPost) {
            Object event = pendingPost.event;
            Subscription subscription = pendingPost.subscription;
            // 释放pendingPost引用的资源
            PendingPost.releasePendingPost(pendingPost);
            if (subscription.active) {
                // 用反射来执行订阅事件的方法
                invokeSubscriber(subscription, event);
            }
        }
    
    void invokeSubscriber(Subscription subscription, Object event) {
            try {
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
        }
    

    这个方法很简单,主要就是从pendingPost中取出之前保存的event、subscription,然后用反射来执行订阅事件的方法,又回到了第一种处理方式。所以mainThreadPoster.enqueue(subscription, event)的核心就是先将将事件入队列,然后通过Handler从子线程切换到主线程中去处理事件。
    backgroundPoster.enqueue()和asyncPoster.enqueue也类似,内部都是先将事件入队列,然后再出队列,但是会通过线程池去进一步处理事件

    粘性事件

    一般情况,我们使用 EventBus 都是准备好订阅事件的方法,然后注册事件,最后在发送事件,即要先有事件的接收者。但粘性事件却恰恰相反,我们可以先发送事件,后续再准备订阅事件的方法、注册事件。
    由于这种差异,我们分析粘性事件原理时,先从事件发送开始,发送一个粘性事件通过如下方式:

    EventBus.getDefault().postSticky("Hello World!");
    

    来看postSticky()方法是如何实现的:

    public void postSticky(Object event) {
            synchronized (stickyEvents) {
                stickyEvents.put(event.getClass(), event);
            }
            // Should be posted after it is putted, in case the subscriber wants to remove immediately
            post(event);
        }
    

    postSticky()方法主要做了两件事,先将事件类型和对应事件保存到stickyEvents中,方便后续使用;然后执行post(event)继续发送事件,这个post()方法就是之前发送的post()方法。所以,如果在发送粘性事件前,已经有了对应类型事件的订阅者,及时它是非粘性的,依然可以接收到发送出的粘性事件。
    发送完粘性事件后,再准备订阅粘性事件的方法,并完成注册。核心的注册事件流程还是我们之前的register()方法中的subscribe()方法,前边分析subscribe()方法时,有一段没有分析的代码,就是用来处理粘性事件的。

    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            // 得到当前订阅了事件的方法的参数类型
            Class<?> eventType = subscriberMethod.eventType;
            // 根据订阅者和订阅方法构造一个订阅事件
            // Subscription类保存了要注册的类对象以及当前的subscriberMethod订阅方法
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    
            // 粘性事件处理
            // 如果当前订阅事件的方法的Subscribe注解的sticky属性为true,即该方法可接受粘性事件
            if (subscriberMethod.sticky) {
                // 默认为true,表示是否向上查找事件的父类
                if (eventInheritance) {
                    // stickyEvents就是发送粘性事件时,保存了事件类型和对应事件
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        // 如果candidateEventType是eventType的子类或自身类
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            // 获得对应的事件
                            Object stickyEvent = entry.getValue();
                            // 处理粘性事件
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    // 获得对应的事件
                    Object stickyEvent = stickyEvents.get(eventType);
                    // 处理粘性事件
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
    

    可以看到,处理粘性事件就是在 EventBus 注册时,遍历stickyEvents,如果当前要注册的事件订阅方法是粘性的,并且该方法接收的事件类型和stickyEvents中某个事件类型相同或者是其父类,则取出stickyEvents中对应事件类型的具体事件,做进一步处理。

    继续看checkPostStickyEventToSubscription()处理方法

     private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
            if (stickyEvent != null) {
                // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
                // --> Strange corner case, which we don't take care of here.
                postToSubscription(newSubscription, stickyEvent, isMainThread());
            }
        }
    

    最终还是通过postToSubscription()方法完成粘性事件的处理,这就是粘性事件的整个处理流程

    参考

    EventBus 官方
    EventBus 原理解析
    EventBus3.0源码解析
    老司机教你 “飙” EventBus 3

    相关文章

      网友评论

          本文标题:EventBus源码分析

          本文链接:https://www.haomeiwen.com/subject/bivbcqtx.html