美文网首页
EventBus 3.1.1源码分析

EventBus 3.1.1源码分析

作者: 君莫看 | 来源:发表于2018-01-22 18:55 被阅读0次

    EventBus地址:https://github.com/greenrobot/EventBus

    一、eventbus使用

    EventBus是用于在Android和java中 发布/订阅 的事件总线

    模式图
    使用EventBus三部曲
    1、定义事件Event
    public static class MessageEvent { /* Additional fields if needed */ }
    

    2、声明并注解一个订阅者

    @Subscribe(threadMode = ThreadMode.MAIN)  
    public void onMessageEvent(MessageEvent event) {/* Do something */};
    

    其中@Subscribe(threadMode = ThreadMode.MAIN)的threadMode 可以指定此方法运行的线程模式,threadMode = ThreadMode.MAIN运行在UI线程,即onMessageEvent不能做耗时操作。

    注册和注销订阅者,如果你是在做android开发,那么应该在activity/fragment的生命周期中进行注册/注销。

     @Override
     public void onStart() {
         super.onStart();
         EventBus.getDefault().register(this);
     }
    
     @Override
     public void onStop() {
         super.onStop();
         EventBus.getDefault().unregister(this);
     }
    

    3、发布事件

    EventBus.getDefault().post(new MessageEvent());
    

    二、分析EventBus源码

    1、EventBus的注册 EventBus.getDefault().register(this);

    1-1、EventBus.getDefault()源码

        public static EventBus getDefault() {
            if (defaultInstance == null) {
                synchronized (EventBus.class) {
                    if (defaultInstance == null) {
                        defaultInstance = new EventBus();
                    }
                }
            }
            return defaultInstance;
        }
    

    从源码看,EventBus的getDefault()是采用的是单例模式,并且是线程安全,注意EventBus并不是一个严格意义上的单例模式,因为它的构造方法并不是私有的,所以你可以创建多个EventBus。
    不了解单利模式可以看去之前写的文章《设计模式—单例》
    1-2、register(this);源码

    public void register(Object subscriber) {
            Class<?> subscriberClass = subscriber.getClass();
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
    

    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);

    先看这句List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);从命名可以看出是对订阅者方法的查找,并返回订阅者方法集合。再看下具体实现

        List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
            // 先从缓存中取出subscriberMethods,如果有则直接返回METHOD_CACHE的value
            List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
            if (subscriberMethods != null) {
                return subscriberMethods;
            }
    
            if (ignoreGeneratedIndex) {
                subscriberMethods = findUsingReflection(subscriberClass);
            } else {
                subscriberMethods = findUsingInfo(subscriberClass);
            }
            if (subscriberMethods.isEmpty()) {
                throw new EventBusException("Subscriber " + subscriberClass
                        + " and its super classes have no public methods with the @Subscribe annotation");
            } else {
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
                return subscriberMethods;
            }
        }
    
    • List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);检查Map中是否缓存了此订阅者的方法集合,有直接返回;
    • ignoreGeneratedIndex是个标志位,true的情况下是通过反射来获取订阅者方法列表,false下是在编译期间生成SubscriberInfo,然后在运行时使用SubscriberInfo中保存的事件处理函数事件,减少了反射的耗时,会有运行速度上的提升,默认情况下ignoreGeneratedIndex值是false的
    • subscriberMethods = findUsingReflection(subscriberClass);看下源码
        private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
            FindState findState = prepareFindState();
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
                findUsingReflectionInSingleClass(findState);
                findState.moveToSuperclass(); // 移至父类
            }
            return getMethodsAndRelease(findState);
        }
    

    > FindState findState = prepareFindState();

        private static final int POOL_SIZE = 4;
        private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
    
        private FindState prepareFindState() {
            synchronized (FIND_STATE_POOL) {
                for (int i = 0; i < POOL_SIZE; i++) {
                    FindState state = FIND_STATE_POOL[i];
                    if (state != null) {
                        FIND_STATE_POOL[i] = null;
                        return state;
                    }
                }
            }
            return new FindState();
        }
    

    这里使用了缓存来避免对象的频繁创建所带来的开销,同时可以避免内存抖动;注意这里使用synchronized来避免出现两个或两个以上线程操作同一个FindState对象。[这个技巧可以用于出来摸个对象的频繁创建销毁,及内存抖动激烈的问题]
    >> FindState

        static class FindState {
            // 订阅者方法集合
            final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
            // event为key,method为value
            final Map<Class, Object> anyMethodByEventType = new HashMap<>();
            // 用method的名字生成一个method为key,用订阅者类为value
            final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
            final StringBuilder methodKeyBuilder = new StringBuilder(128);
    
            Class<?> subscriberClass;
            Class<?> clazz;
            // 跳过父类,默认false-不跳过
            boolean skipSuperClasses;
            SubscriberInfo subscriberInfo;
    
            void initForSubscriber(Class<?> subscriberClass) {
                this.subscriberClass = clazz = subscriberClass;
                skipSuperClasses = false;
                subscriberInfo = null;
            }
    
            // 释放资源,并恢复默认设置
            void recycle() {
                subscriberMethods.clear();
                anyMethodByEventType.clear();
                subscriberClassByMethodKey.clear();
                methodKeyBuilder.setLength(0);
                subscriberClass = null;
                clazz = null;
                skipSuperClasses = false;
                subscriberInfo = null;
            }
    
            ......
    
            // 移至父类【给clazz赋值父类】
            // 如果clazz是java包和android包里的,赋值null,并结束
            void moveToSuperclass() {
                if (skipSuperClasses) {
                    clazz = null;
                } else {
                    clazz = clazz.getSuperclass();
                    String clazzName = clazz.getName();
                    /** Skip system classes, this just degrades performance. */
                    if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
                        clazz = null;
                    }
                }
            }
        }
    

    主要说明已经放到注释上。

            // 将满足条件的方法及参数类型添加到anyMethodByEventType中
            boolean checkAdd(Method method, Class<?> eventType) {
                // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
                // Usually a subscriber doesn't have methods listening to the same event type.
                // 这个涉及到两层检查
                // 第一层判断有无method监听此eventType,如果没有则可直接把找到的method加到subscriberMethods中。
                // 第二层判断是方法签名,这里的方法签名其实是methodName+eventType
                Object existing = anyMethodByEventType.put(eventType, method);
                if (existing == null) {
                    return true;
                } else {
                    if (existing instanceof Method) {
                        if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                            // Paranoia check
                            throw new IllegalStateException();
                        }
                        // Put any non-Method object to "consume" the existing Method
                        anyMethodByEventType.put(eventType, this);
                    }
                    return checkAddWithMethodSignature(method, eventType);
                }
            }
        
            // 方法签名
            private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
                // 拼接所谓的方法签名
                methodKeyBuilder.setLength(0);
                methodKeyBuilder.append(method.getName());
                methodKeyBuilder.append('>').append(eventType.getName());
        
                String methodKey = methodKeyBuilder.toString();
                Class<?> methodClass = method.getDeclaringClass(); // 获得声明此method的类
                Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
                // “methodClassOld.isAssignableFrom(methodClass)” methodClassOld是否是methodClass的父类或者同一个类
                if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                    // Only add if not already found in a sub class
                    return true;
                } else {
                    // Revert the put, old class is further down the class hierarchy
                    subscriberClassByMethodKey.put(methodKey, methodClassOld);
                    return false;
                }
            }
            
    

    这两个方法是FindState最重要的两个方法,这里单独举例说明下:
    第一种情况:比如一个类有多个订阅方法,方法名不同,但它们的参数类型都是相同的(虽然一般不这样写,但不排除这样的可能),那么遍历这些方法的时候,会多次调用到checkAdd方法,由于existing不为null,那么会进而调用checkAddWithMethodSignature方法,但是由于每个方法的名字都不同,因此methodClassOld会一直为null,因此都会返回true。也就是说,允许一个类有多个参数相同的订阅方法。
    第二种情况:类B继承自类A,而每个类都是有相同订阅方法,换句话说,类B的订阅方法继承并重写自类A,它们都有着一样的方法签名。方法的遍历会从子类开始,即B类,在checkAddWithMethodSignature方法中,methodClassOld为null,那么B类的订阅方法会被添加到列表中。接着,向上找到类A的订阅方法,由于methodClassOld不为null而且显然类B不是类A的父类,methodClassOld.isAssignableFrom(methodClass)也会返回false,那么会返回false。也就是说,子类继承并重写了父类的订阅方法,那么只会把子类的订阅方法添加到订阅者列表,父类的方法会忽略。

    >回到private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass),来看这个句findUsingReflectionInSingleClass(findState);

        private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
                // This is faster than getMethods, especially when subscribers are fat classes like Activities
                // 获取的是类【自身】声明的所有方法,包含public、protected和private方法
                methods = findState.clazz.getDeclaredMethods();
            } catch (Throwable th) {
                // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
                // 获取的是类的所有共有方法,**这就包括自身的所有【public】方法,和从基类继承的、从接口实现的所有public方法**
                methods = findState.clazz.getMethods();
                findState.skipSuperClasses = true;
            }
            for (Method method : methods) {
                int modifiers = method.getModifiers(); // 获取字段的修饰符
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    // 获得一个方法参数数组(getparameterTypes用于返回一个描述参数类型的Class对象数组)
                    Class<?>[] parameterTypes = method.getParameterTypes(); 
                    if (parameterTypes.length == 1) {
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        if (subscribeAnnotation != null) {
                            Class<?> eventType = parameterTypes[0];
                            if (findState.checkAdd(method, eventType)) {
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        // 不合法的注解方法 (方法只能有一个参数)  
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    // 不合法的注解方法 (必须为public,非static、非abstract)  
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
        }
    

    getDeclaredMethods()获取的是类自身声明的所有方法,包含public、protected和private方法。

    getMethods()获取的是类的所有共有方法,这就包括自身的所有public方法,和从基类继承的、从接口实现的所有public方法。而使用此方法自然不需要再遍历父类的方法,所以findState.skipSuperClasses = true;来跳过父类的遍历。

    int modifiers = method.getModifiers();获取字段的修饰符
    对应如下: PUBLIC: 1 PRIVATE: 2 PROTECTED: 4 STATIC: 8 FINAL: 16 SYNCHRONIZED: 32 VOLATILE: 64 TRANSIENT: 128 NATIVE: 256 INTERFACE: 512 ABSTRACT: 1024 STRICT: 2048

    if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0)
    private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;只获取public类型的方法

    Class<?>[] parameterTypes = method.getParameterTypes(); 
    if (parameterTypes.length == 1) {
        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
        if (subscribeAnnotation != null) {
            Class<?> eventType = parameterTypes[0];
            if (findState.checkAdd(method, eventType)) {
                ThreadMode threadMode = subscribeAnnotation.threadMode();
                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
            }
        }
    }
    

    当方法参数个数为1,并且此方法有Subscribe类型的注解时,通过findState.checkAdd(method, eventType)将方法和参数类型保存起来,如果保存成功,则构造一个SubscriberMethod对象把数据保存,并添加到findState.subscriberMethods集合中。

    public class SubscriberMethod {
        final Method method; // 方法
        final ThreadMode threadMode; // 方法运行线程类型
        final Class<?> eventType; // 参数类型
        final int priority; // 优先级
        final boolean sticky; // 是否粘性
        /** Used for efficient comparison */
        String methodString; // 
    
        public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
            this.method = method;
            this.threadMode = threadMode;
            this.eventType = eventType;
            this.priority = priority;
            this.sticky = sticky;
        }
    

    else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class))strictMethodVerification表示是否进行精细检查,默认值是false。如果精细检查【即strictMethodVerification=true】,并且method含有Subscribe对象的注解,则抛出异常

    • subscriberMethods = findUsingInfo(subscriberClass);总算分析完了findUsingReflection,我们接着分析findUsingInfo,先发下源码
        private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
            FindState findState = prepareFindState();
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
                // 获取订阅者信息,没有配置MyEventBusIndex返回null
                findState.subscriberInfo = getSubscriberInfo(findState);
                if (findState.subscriberInfo != null) {
                    SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                    for (SubscriberMethod subscriberMethod : array) {
                        if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                            findState.subscriberMethods.add(subscriberMethod);
                        }
                    }
                } else {
                    // 通过反射来查找订阅方法
                    findUsingReflectionInSingleClass(findState);
                }
                findState.moveToSuperclass();
            }
            return getMethodsAndRelease(findState);
        }
    
        private SubscriberInfo getSubscriberInfo(FindState findState) {
            if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
                SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
                if (findState.clazz == superclassInfo.getSubscriberClass()) {
                    return superclassInfo;
                }
            }
            if (subscriberInfoIndexes != null) {
                for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                    SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                    if (info != null) {
                        return info;
                    }
                }
            }
            return null;
        }
    

    > findState.subscriberInfo = getSubscriberInfo(findState);这里涉及到一个EventBus的高级用法,也就是通过注解在编译期动态生成一个MyEventBusIndex.java类,从而避免通过反射来查找满足条件的方法。下面给出官方文档及两篇这个高级用法的文章:

    官方总文档地址:http://greenrobot.org/eventbus/documentation/
    官方地址:https://github.com/greenrobot/EventBus
    博客一:EventBus高级使用姿势
    博客二:EventBus3.0新特性之Subscriber Index

    如果没有配置这个高级用法,findState.subscriberInfo值便会是null,然后通过反射去获取方法并筛选。
    MyEventBusIndex.java文件生成位置【我是用的是kotlin方案,不同的方案位置会有小的区别,但大致位置都是在source下】

    MyEventBusIndex文件位置.png

    注意在匿名类中是用EventBus时,这种高级用法是不会在MyEventBusIndex.java生成相关信息的(原因请自行百度),但会执行反射的逻辑来完成对方法的筛选。

    subscribe(subscriber, subscriberMethod);

    再来分析下register(this);最后一句代码subscribe(subscriber, subscriberMethod);,源码如下

        // Must be called in synchronized block
        private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            // 从订阅方法中拿到订阅事件的类型  
            Class<?> eventType = subscriberMethod.eventType;
            // 创建一个新的订阅
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            // 通过订阅事件类型,找到所有的订阅(Subscription),订阅中包含了订阅者,订阅方法
            // CopyOnWriteArrayList是一个ArrayList的线程安全的变体,具体原理,使用请自行百度
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                // 如果该订阅事件类型 没有订阅列表,那么创建一个该事件类型的订阅列表
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
            
            int size = subscriptions.size();
            // 根据订阅方法的优先级,添加到订阅列表
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
            
            // 将这个订阅事件加入到订阅者的订阅事件列表中
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            subscribedEvents.add(eventType);
    
            if (subscriberMethod.sticky) {
                // eventInheritance=true时(默认为true)
                // EventBus会考虑事件的超类,即事件如果继承自超类,那么该超类也会作为事件发送给订阅者。
                // 比如 A extends B implements C  发布者post(A),那么找订阅者的时候不仅要找订阅了事件A的订阅者,还要找订阅了B和C的订阅者
                if (eventInheritance) {
                    // Existing sticky events of all subclasses of eventType have to be considered.
                    // Note: Iterating over all events may be inefficient with lots of sticky events,
                    // thus data structure should be changed to allow a more efficient lookup
                    // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        // eventType是否是candidateEventType的父类或本身
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            Object stickyEvent = entry.getValue();
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    Object stickyEvent = stickyEvents.get(eventType);
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
    

    > 大部分代码的理解已放到代码的注释中了,这里我单独拉出下面这段代码分析下

    if (eventInheritance) {
        // Existing sticky events of all subclasses of eventType have to be considered.
        // Note: Iterating over all events may be inefficient with lots of sticky events,
        // thus data structure should be changed to allow a more efficient lookup
        // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
        Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
        for (Map.Entry<Class<?>, Object> entry : entries) {
            Class<?> candidateEventType = entry.getKey();
            if (eventType.isAssignableFrom(candidateEventType)) {
                Object stickyEvent = entry.getValue();
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }
    

    先简单写一个列子,方法讲解

    public class ClazzA {
        public ClazzA() { EventBus.getDefault().register(this); }     
    
        @Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
        public void eventbusMain(BaseEvent event) { }
    }
    
    public class BaseEvent { }
    public class EventA extends BaseEvent { }
    

    ClazzA构造的时候会执行EventBus的注册流程,又因为ClazzA中的订阅方法eventbusMainsticky所以会执行subscribe(subscriber, subscriberMethod);方法中的subscriberMethod.sticky模块内代码,而我们的EventBus是默认构造EventBus.getDefault(),也就是说eventInheritancetrue会执行eventInheritance代码块,如果这个时候stickyEvents.entrySet()中有两个Event事件对象 【a是EventA类型,b是BaseEvent类型】,通过for循环后,这两个对象都会交给eventbusMain(BaseEvent event)执行,也就是说eventType.isAssignableFrom(candidateEventType)这两个对象都会通过,【第一次循环,BaseEvent是不是BaseEvent的父类或本身? 第二次循环,BaseEvent是不是ClazzA的父类或本身?------ 因为Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();是个集合,顺序是不确定,可能是反的,不要纠结】

    • checkPostStickyEventToSubscription(newSubscription, stickyEvent);发送订阅事件类给订阅者
        private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
            if (stickyEvent != null) {
                // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
                // --> Strange corner case, which we don't take care of here.
                postToSubscription(newSubscription, stickyEvent, isMainThread());
            }
        }
    
        private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    
        void invokeSubscriber(Subscription subscription, Object event) {
            try {
                // 执行订阅方法
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
        }
    

    ThreadMode 用法及说明

    • PostThread:默认的 ThreadMode,表示在执行 Post 操作的线程直接调用订阅者的事件响应方法,不论该线程是否为主线程(UI 线程)。当该线程为主线程时,响应方法中不能有耗时操作,否则有卡主线程的风险。适用场景:对于是否在主线程执行无要求,但若 Post 线程为主线程,不能耗时的操作;
    • MainThread:在主线程中执行响应方法。如果发布线程就是主线程,则直接调用订阅者的事件响应方法,否则通过主线程的 Handler 发送消息在主线程中处理——调用订阅者的事件响应函数。显然,MainThread类的方法也不能有耗时操作,以避免卡主线程。适用场景:必须在主线程执行的操作;
    • BackgroundThread:在后台线程中执行响应方法。如果发布线程不是主线程,则直接调用订阅者的事件响应函数,否则启动唯一的后台线程去处理。由于后台线程是唯一的,当事件超过一个的时候,它们会被放在队列中依次执行,因此该类响应方法虽然没有PostThread类和MainThread类方法对性能敏感,但最好不要有重度耗时的操作或太频繁的轻度耗时操作,以造成其他操作等待。适用场景:操作轻微耗时且不会过于频繁,即一般的耗时操作都可以放在这里;
    • Async:不论发布线程是否为主线程,都使用一个空闲线程来处理。和BackgroundThread不同的是,Async类的所有线程是相互独立的,因此不会出现卡线程的问题。适用场景:长耗时操作,例如网络访问。

    mainThreadPoster.enqueue(subscription, event);主线程执行订阅方法。

    private final Poster mainThreadPoster;
    
    EventBus(EventBusBuilder builder) {
        ......
        mainThreadSupport = builder.getMainThreadSupport();
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        ......
    }
    
        MainThreadSupport getMainThreadSupport() {
            if (mainThreadSupport != null) {
                return mainThreadSupport;
            } else if (Logger.AndroidLogger.isAndroidLogAvailable()) {
                // android的Ui线程
                Object looperOrNull = getAndroidMainLooperOrNull();
                return looperOrNull == null ? null :
                        new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
            } else {
                return null;
            }
        }
    

    其中Logger.AndroidLogger.isAndroidLogAvailable()其实是通过反射android中的"android.util.Log"包是否存在来实现的,代码如下:

    public interface Logger {
    
        ........
        public static class AndroidLogger implements Logger {
            static final boolean ANDROID_LOG_AVAILABLE;
    
            static {
                boolean android = false;
                try {
                    android = Class.forName("android.util.Log") != null;
                } catch (ClassNotFoundException e) {
                    // OK
                }
                ANDROID_LOG_AVAILABLE = android;
            }
    
            public static boolean isAndroidLogAvailable() {
                return ANDROID_LOG_AVAILABLE;
            }
           ........
        }
        .......
    }
    

    在静态代码块中做的反射【只会执行一次】,静态代码块的知识偏离本文重点,请自行百度。

        Object getAndroidMainLooperOrNull() {
            try {
                // 获得android主线程的looper
                return Looper.getMainLooper();
            } catch (RuntimeException e) {
                // Not really a functional Android (e.g. "Stub!" maven dependencies)
                return null;
            }
        }
    

    这个方法是用来获得android主线程的looper对象。

    public interface MainThreadSupport {
        boolean isMainThread();
        Poster createPoster(EventBus eventBus);
    
        class AndroidHandlerMainThreadSupport implements MainThreadSupport {
            private final Looper looper;
            public AndroidHandlerMainThreadSupport(Looper looper) {
                this.looper = looper;
            }
            @Override
            public boolean isMainThread() {
                return looper == Looper.myLooper();
            }
            @Override
            public Poster createPoster(EventBus eventBus) {
                return new HandlerPoster(eventBus, looper, 10);
            }
        }
    }
    
    public class HandlerPoster extends Handler implements Poster {
        private final PendingPostQueue queue;
        private final int maxMillisInsideHandleMessage;
        private final EventBus eventBus;
        private boolean handlerActive;
    
        protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
            super(looper);
            this.eventBus = eventBus;
            this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
            // 构造一个post意图对象
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                // 将构造的PendingPost 加入到队列中
                queue.enqueue(pendingPost);
                if (!handlerActive) {
                    // handler标记为活跃状态
                    handlerActive = true;
                    // 发送PendingPost对象
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
    
        @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                    // 从队列中获得要执行的PendingPost 
                    PendingPost pendingPost = queue.poll();
                    // 双重加锁检查,如果队列中没有信息,则hanlder状态标记为不活跃状态,同事退出循环
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    // 执行订阅者的订阅方法
                    eventBus.invokeSubscriber(pendingPost);
                    // 循环发送队列中的所有pendingPost对象
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                handlerActive = rescheduled;
            }
        }
    }
    

    可以看出MainThreadSupport是通过主线程的Looper构造一个主线程的handler对象,这个handler中维护了一个队列PendingPostQueue,也就是说mainThreadPoster.enqueue(subscription, event);是通过构造PendingPost对象并添加到队列中,然后激活队列来实现发送。

    backgroundPoster.enqueue(subscription, event);后台线程中执行响应方法

    final class BackgroundPoster implements Runnable, Poster {
        private final PendingPostQueue queue;
        private final EventBus eventBus;
        private volatile boolean executorRunning;
    
        BackgroundPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!executorRunning) {
                    executorRunning = true;
                    eventBus.getExecutorService().execute(this);
                }
            }
        }
        @Override
        public void run() {
            try {
                try {
                    while (true) {
                        PendingPost pendingPost = queue.poll(1000);
                        if (pendingPost == null) {
                            synchronized (this) {
                                // Check again, this time in synchronized
                                pendingPost = queue.poll();
                                if (pendingPost == null) {
                                    executorRunning = false;
                                    return;
                                }
                            }
                        }
                        eventBus.invokeSubscriber(pendingPost);
                    }
                } catch (InterruptedException e) {
                    eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
                }
            } finally {
                executorRunning = false;
            }
        }
    }
    
    // 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    

    主要代码就一句eventBus.getExecutorService().execute(this);交给线程池处理。

    asyncPoster.enqueue(subscription, event);不论发布线程是否为主线程,都使用一个空闲线程来处理。和BackgroundThread不同的是,Async类的所有线程是相互独立的,因此不会出现卡线程的问题。

    class AsyncPoster implements Runnable, Poster {
        private final PendingPostQueue queue;
        private final EventBus eventBus;
        AsyncPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            queue.enqueue(pendingPost);
            eventBus.getExecutorService().execute(this);
        }
        @Override
        public void run() {
            PendingPost pendingPost = queue.poll();
            if(pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            }
            eventBus.invokeSubscriber(pendingPost);
        }
    }
    

    AsyncPosterBackgroundPoster代码基本一样,这里就不分析。它们两个主要区别就是BackgroundPoster把一堆订阅方法放在一个线程中执行,而AsyncPoster是为每一个订阅方法都创建一个线程来单独执行。

    • 总结:到这终于分析完了EventBus的register()的整个流程。下面我绘制了一个流程图来方便理解整个注册流程。
      EventBus整个register流程

    2、EventBus的订阅方法声明

    @Subscribe(threadMode = ThreadMode.MAIN)  
    public void onMessageEvent(MessageEvent event) {/* Do something */};
    

    2-1、订阅方法说明
    在上面的EventBus.getDefault()流程分支中,我们得知EventBus的订阅方法必须是public,一个参数,并且要一定要有@Subscribe的注解,才能成功的将该方法添加到EventBus的订阅方法列表中,同时EventBus提供了两种方法来查找我们的订阅方法,一种是反射【效率低】,另一种是通过注解,在编译期动态生成订阅者方法列表【效率高】。
    2-2、Subscribe都有哪些注解参数

    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD})
    public @interface Subscribe {
        ThreadMode threadMode() default ThreadMode.POSTING;
        /**
         * If true, delivers the most recent sticky event (posted with
         * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
         */
        boolean sticky() default false;
        /** Subscriber priority to influence the order of event delivery.
         * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
         * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
         * delivery among subscribers with different {@link ThreadMode}s! */
        int priority() default 0;
    }
    

    源码中看出就三个属性参数,分别是线程模式、是否粘性、优先级。

    3、EventBus发布事件EventBus.getDefault().post(new MessageEvent());

    3-1、void post(Object event)

        public void post(Object event) {
            // 获得当前线程的PostingThreadState封装对象
            PostingThreadState postingState = currentPostingThreadState.get();
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);
            
            if (!postingState.isPosting) {  // 为发送状态
                postingState.isMainThread = isMainThread(); // 是否主线程
                postingState.isPosting = true; // 设置为正在发送状态
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
                    while (!eventQueue.isEmpty()) {
                        postSingleEvent(eventQueue.remove(0), postingState); // 发送事件
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    

    第一行源码是从currentPostingThreadState中获取PostingThreadState对象,来下看currentPostingThreadState的定义

        private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
            @Override
            protected PostingThreadState initialValue() {
                return new PostingThreadState();
            }
        };
    

    看来currentPostingThreadState是一个ThreadLocal<T>对象,ThreadLocal是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,而这段数据是不会与其他线程共享的。【下面是一段跑题代码】

    public class ThreadLocal<T> {
        public T get() {
            Thread t = Thread.currentThread();
            ThreadLocalMap map = getMap(t);
            if (map != null) {
                ThreadLocalMap.Entry e = map.getEntry(this);
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    T result = (T)e.value;
                    return result;
                }
            }
            return setInitialValue();
        }
    }
    // 其内部原理是通过生成一个它包裹的泛型对象的数组,在不同的线程会有不同的数组索引值,
    // 通过这样就可以做到每个线程通过 get() 方法获取的时候,取到的只能是自己线程所对应的数据。 
    

    回到正题继续分析

        /** For ThreadLocal, much faster to set (and get multiple values). */
        final static class PostingThreadState {
            final List<Object> eventQueue = new ArrayList<>();    // 事件队列
            boolean isPosting;    // 是否在执行postSingleEvent()方法
            boolean isMainThread;  // 是否是UI线程
            Subscription subscription;
            Object event;
            boolean canceled;
        }
    

    PostingThreadState主要是封装了一些post时的参数。

    post(Object event)的核心代码就是这个句,postSingleEvent(eventQueue.remove(0), postingState);

       private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            if (eventInheritance) {
                // 获取到eventClass所有父类的集合
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    // 有一个为true,即真个运算结果为true
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            if (!subscriptionFound) {
                // 没有找到此事件类型的订阅者逻辑
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                // sendNoSubscriberEvent=true发送没有此事件类型的订阅者的事件
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
    

    List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);是用来获取eventClass所继承的父类,及所有接口。

        /** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
        private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
            synchronized (eventTypesCache) {
                // eventTypesCache一个map的缓存对象,通过缓存,提高运行效率
                List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
                if (eventTypes == null) {
                    eventTypes = new ArrayList<>();
                    Class<?> clazz = eventClass;
                    while (clazz != null) {
                        eventTypes.add(clazz);
                        addInterfaces(eventTypes, clazz.getInterfaces());
                        clazz = clazz.getSuperclass();
                    }
                    eventTypesCache.put(eventClass, eventTypes);
                }
                return eventTypes;
            }
        }
    
        /** Recurses through super interfaces. */
        //   递归方式查找出所有接口
        static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
            for (Class<?> interfaceClass : interfaces) {
                if (!eventTypes.contains(interfaceClass)) {
                    eventTypes.add(interfaceClass);
                    addInterfaces(eventTypes, interfaceClass.getInterfaces());
                }
            }
        }
    

    这里通过递归来查找出所有的接口,并添加到eventTypes中;

        private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
            CopyOnWriteArrayList<Subscription> subscriptions;
            synchronized (this) {
                // 所有订阅了eventClass的事件集合
                subscriptions = subscriptionsByEventType.get(eventClass);
            }
            if (subscriptions != null && !subscriptions.isEmpty()) {
                for (Subscription subscription : subscriptions) {
                    postingState.event = event;
                    postingState.subscription = subscription;
                    boolean aborted = false;
                    try {
                        // 调用所有订阅了此类型的,订阅方法
                        postToSubscription(subscription, event, postingState.isMainThread);
                        aborted = postingState.canceled;
                    } finally {
                        postingState.event = null;
                        postingState.subscription = null;
                        postingState.canceled = false;
                    }
                    if (aborted) {
                        break;
                    }
                }
                return true;
            }
            return false;
        }
    

    postToSubscription(subscription, event, postingState.isMainThread);上面register()中已有相关分析,这里不赘述了;

    3-2、void postSticky(Object event)

        private final Map<Class<?>, Object> stickyEvents;
    
        public void postSticky(Object event) {
            synchronized (stickyEvents) {
                stickyEvents.put(event.getClass(), event);
            }
            // Should be posted after it is putted, in case the subscriber wants to remove immediately
            post(event);
        }
    

    如果是粘性事件,将此事件添加到stickyEvents中,然后调用void post(Object event),就和上面的分析一样了,这里把event添加到stickyEvents目的是,当后续有register(this);时执行sticky订阅方法。

    4、EventBus的反注册 EventBus.getDefault().unregister(this);

    1-1、unregister(this)源码

        /** Unregisters the given subscriber from all event classes. */
        public synchronized void unregister(Object subscriber) {
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                for (Class<?> eventType : subscribedTypes) {
                    unsubscribeByEventType(subscriber, eventType);
                }
                typesBySubscriber.remove(subscriber);
            } else {
                logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
        }
    
        /** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
        private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
            List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions != null) {
                int size = subscriptions.size();
                for (int i = 0; i < size; i++) {
                    Subscription subscription = subscriptions.get(i);
                    if (subscription.subscriber == subscriber) {
                        subscription.active = false;
                        subscriptions.remove(i);
                        i--;
                        size--;
                    }
                }
            }
        }
    

    unregister就是进行一些数据从集合中移除,资源回收的操作和重置,看看就可以了。

    PS:终于算分析完了,一贯只看不写我的,第一次写这么长的源码分析文章,写到最后自己都想放弃,但抱着做事要有始有终,还是坚持写完了@@@@@@~~~~~!!!!

    相关文章

      网友评论

          本文标题:EventBus 3.1.1源码分析

          本文链接:https://www.haomeiwen.com/subject/airdoxtx.html