Android EventBus 源码解析

作者: 没有颜色的菜 | 来源:发表于2018-08-19 21:09 被阅读3次

    基于最新的 3.1.1 分析

    前言

    之前分析的都是官方库的一些源码,现在打算尝试分析一些比较优秀的第三方开源库,选择分析EventBus,一方面是因为他的库不大,容易理解,这样我们也容易接受,如果一开始就项分析很大的库,会比较难吧。

    是什么东西?

    EventBus 是一个基于Android 或者Java 的发布订阅的消息总线,从使用上来说,它允许我们通过注解,就能在Android 中自由的切换线程,发布,接受消息,极大的减轻了我们使用Handler的负担,做了很好的解耦。
    官方架构图

    EventBus-Publish-Subscribe (copy).png

    先手写一个小例子吧

    在onCreate里边注册

        @Override
        public void onCreate(Bundle savedInstanceState) {
              EventBus.getDefault().register(this);
        }
    

    在onDestroy里边取消

        @Override
        public void onDestroy() {
              EventBus.getDefault().unregister(this);
        }
    

    在Activity 订阅消息

        @Subscribe(threadMode = ThreadMode.MAIN)
        public void subscribe(Object object) {
            Log.i(TAG, "Recevied: " + object);
        }
    

    在其他地方发布消息

        new Thread(() -> EventBus.getDefault().post(new Object())).start();
    

    这样就完成了一次完整的事件发布订阅流程,是不是要比Handler更简单,很方便呢,更清晰?

    前世今生

        public class EventBus {}
    

    源码分析

    属性

    defaultInstance 是默认的实例,使用Singleton实现,饿汉式,注意volatile的使用,很多同学在写恶汉容易遗漏。DEFAULT_BUILDER 是默认的,唯一。eventTypesCache是事件类型集合,subscriptionsByEventType是根据事件类型分组的订阅集合,typesBySubscriber 是根据订阅者分组的事件类型集合

        static volatile EventBus defaultInstance;
    
        private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
        private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();
    
        private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
        private final Map<Object, List<Class<?>>> typesBySubscriber;
        private final Map<Class<?>, Object> stickyEvents;
    

    没看懂

        private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
            @Override
            protected PostingThreadState initialValue() {
                return new PostingThreadState();
            }
        };
    
        /** For ThreadLocal, much faster to set (and get multiple values). */
        final static class PostingThreadState {
            final List<Object> eventQueue = new ArrayList<>();
            boolean isPosting;
            boolean isMainThread;
            Subscription subscription;
            Object event;
            boolean canceled;
        }
    

    五种线程模型,主线程,后台线程,异步线程,和当前线程一样,有顺序的

        // @Nullable
        private final MainThreadSupport mainThreadSupport;
        // @Nullable
        private final Poster mainThreadPoster;
        private final BackgroundPoster backgroundPoster;
        private final AsyncPoster asyncPoster;
    

    一个是订阅发现者,主要是为了获取订阅的数据,另外就是线程池

        private final SubscriberMethodFinder subscriberMethodFinder;
        private final ExecutorService executorService;
    

    一些final变量

        private final boolean throwSubscriberException;
        private final boolean logSubscriberExceptions;
        private final boolean logNoSubscriberMessages;
        private final boolean sendSubscriberExceptionEvent;
        private final boolean sendNoSubscriberEvent;
        private final boolean eventInheritance;
    
        private final int indexCount;
        private final Logger logger;
    

    构造函数

    使用默认的建造者去初始化EventBus,主要是对之前属性的初始化

        /**
         * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
         * central bus, consider {@link #getDefault()}.
         */
        public EventBus() {
            this(DEFAULT_BUILDER);
        }
    
        EventBus(EventBusBuilder builder) {
            logger = builder.getLogger();
            subscriptionsByEventType = new HashMap<>();
            typesBySubscriber = new HashMap<>();
            stickyEvents = new ConcurrentHashMap<>();
            mainThreadSupport = builder.getMainThreadSupport();
            mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
            backgroundPoster = new BackgroundPoster(this);
            asyncPoster = new AsyncPoster(this);
            indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
            subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                    builder.strictMethodVerification, builder.ignoreGeneratedIndex);
            logSubscriberExceptions = builder.logSubscriberExceptions;
            logNoSubscriberMessages = builder.logNoSubscriberMessages;
            sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
            sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
            throwSubscriberException = builder.throwSubscriberException;
            eventInheritance = builder.eventInheritance;
            executorService = builder.executorService;
        }
    

    getDefault() 使用单例模式,DoubleCheck

        /** Convenience singleton for apps using a process-wide EventBus instance. */
        public static EventBus getDefault() {
            if (defaultInstance == null) {
                synchronized (EventBus.class) {
                    if (defaultInstance == null) {
                        defaultInstance = new EventBus();
                    }
                }
            }
            return defaultInstance;
        }
    

    顺便看一下EventBusBuilder
    线程池使用的是默认的CachedThreadPool,核心线程数为0,其余变量都为ture,之后再分析用途

        private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    
        boolean logSubscriberExceptions = true;
        boolean logNoSubscriberMessages = true;
        boolean sendSubscriberExceptionEvent = true;
        boolean sendNoSubscriberEvent = true;
        boolean throwSubscriberException;
        boolean eventInheritance = true;
        boolean ignoreGeneratedIndex;
        boolean strictMethodVerification;
        ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
        List<Class<?>> skipMethodVerificationForClasses;
        List<SubscriberInfoIndex> subscriberInfoIndexes;
        Logger logger;
        MainThreadSupport mainThreadSupport;
    
        EventBusBuilder() {
        }
    

    在重点看一下这几个参数的赋值,首先对mainThreadSupport赋值,调用EventBusBuilder的getMainThreadSupport方法,该方法第一次是通过getAndroidMainLooperOrNull() 获取一个MainLooper,也就是ActivityThread的main中初始化的MainLooper对象

        mainThreadSupport = builder.getMainThreadSupport(); 
    
        // class evnetbusbuilder
        MainThreadSupport getMainThreadSupport() {
            if (mainThreadSupport != null) {
                return mainThreadSupport;
            } else if (Logger.AndroidLogger.isAndroidLogAvailable()) {
                Object looperOrNull = getAndroidMainLooperOrNull();
                return looperOrNull == null ? null :
                        new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
            } else {
                return null;
            }
        }
    
        Object getAndroidMainLooperOrNull() {
            try {
                return Looper.getMainLooper();
            } catch (RuntimeException e) {
                // Not really a functional Android (e.g. "Stub!" maven dependencies)
                return null;
            }
        }
    

    上一步获取到了一个MainThreadSupport,其中的Looper为MainLooper

    public interface MainThreadSupport {
    
        boolean isMainThread();
    
        Poster createPoster(EventBus eventBus);
    
        class AndroidHandlerMainThreadSupport implements MainThreadSupport {
    
            private final Looper looper;
    
            public AndroidHandlerMainThreadSupport(Looper looper) {
                this.looper = looper;
            }
    
            @Override
            public boolean isMainThread() {
                return looper == Looper.myLooper();
            }
    
            @Override
            public Poster createPoster(EventBus eventBus) {
                return new HandlerPoster(eventBus, looper, 10);
            }
        }
    
    }
    

    给mainThreadPoster 赋值,创建了一个HandlerPoster,参数为this,MainLooper, 10,10指的是最大消息处理延时

          mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    

    看一下HandlerPoster的实现,继承自Handler,实现了 Poster的enqueue方法

    public class HandlerPoster extends Handler implements Poster {
    
        private final PendingPostQueue queue;
        private final int maxMillisInsideHandleMessage;
        private final EventBus eventBus;
        private boolean handlerActive;
    
        protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
            super(looper);
            this.eventBus = eventBus;
            this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!handlerActive) {
                    handlerActive = true;
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
    
        @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                handlerActive = rescheduled;
            }
        }
    }
    
    interface Poster {
    
        /**
         * Enqueue an event to be posted for a particular subscription.
         *
         * @param subscription Subscription which will receive the event.
         * @param event        Event that will be posted to subscribers.
         */
        void enqueue(Subscription subscription, Object event);
    }
    

    再来看一下他自己实现的队列,两个方法,入队和出队,线程安全

    final class PendingPostQueue {
        private PendingPost head;
        private PendingPost tail;
    
        synchronized void enqueue(PendingPost pendingPost) {
            if (pendingPost == null) {
                throw new NullPointerException("null cannot be enqueued");
            }
            if (tail != null) {
                tail.next = pendingPost;
                tail = pendingPost;
            } else if (head == null) {
                head = tail = pendingPost;
            } else {
                throw new IllegalStateException("Head present, but no tail");
            }
            notifyAll();
        }
    
        synchronized PendingPost poll() {
            PendingPost pendingPost = head;
            if (head != null) {
                head = head.next;
                if (head == null) {
                    tail = null;
                }
            }
            return pendingPost;
        }
    
        synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
            if (head == null) {
                wait(maxMillisToWait);
            }
            return poll();
        }
    
    }
    

    队列的元素,设计的还不错,使用池的方式获取消PendingPost,获取PendingPost的时候,先检查PendingPost池大小,如果大于零,则从PendingPost池里面取最后一个PendingPost,并且删除,否则,新建一个,当释放PendingPost的时候,将PendingPost的值初始化,并且会受到PendingPost池里面,当然,这个值不能无限大,作者设计的还是挺不错的,这与我们常用的Handler中的Message设计思路差不多,大家也可以学一学

    final class PendingPost {
        private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
    
        Object event;s
        Subscription subscription;
        PendingPost next;
    
        private PendingPost(Object event, Subscription subscription) {
            this.event = event;
            this.subscription = subscription;
        }
    
        static PendingPost obtainPendingPost(Subscription subscription, Object event) {
            synchronized (pendingPostPool) {
                int size = pendingPostPool.size();
                if (size > 0) {
                    PendingPost pendingPost = pendingPostPool.remove(size - 1);
                    pendingPost.event = event;
                    pendingPost.subscription = subscription;
                    pendingPost.next = null;
                    return pendingPost;
                }
            }
            return new PendingPost(event, subscription);
        }
    
        static void releasePendingPost(PendingPost pendingPost) {
            pendingPost.event = null;
            pendingPost.subscription = null;
            pendingPost.next = null;
            synchronized (pendingPostPool) {
                // Don't let the pool grow indefinitely
                if (pendingPostPool.size() < 10000) {
                    pendingPostPool.add(pendingPost);
                }
            }
        }
    
    }
    

    然后是backgroundPoster,每一个模型都有一个queue,各自处理各自的事情。第一次入队是,使用线程池启动,之后就开启无线循环不断的读取任务,当1s内获取不到任务,再没有任务时则关闭线程,个人觉得这个实现不太好,如果这1s内有任务进来,也不能立刻运行,拜拜浪费了时间,如果使用notify的机制实现会好一些。

    final class BackgroundPoster implements Runnable, Poster {
    
        private final PendingPostQueue queue;
        private final EventBus eventBus;
    
        private volatile boolean executorRunning;
    
        BackgroundPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!executorRunning) {
                    executorRunning = true;
                    eventBus.getExecutorService().execute(this);
                }
            }
        }
    
        @Override
        public void run() {
            try {
                try {
                    while (true) {
                        PendingPost pendingPost = queue.poll(1000);
                        if (pendingPost == null) {
                            synchronized (this) {
                                // Check again, this time in synchronized
                                pendingPost = queue.poll();
                                if (pendingPost == null) {
                                    executorRunning = false;
                                    return;
                                }
                            }
                        }
                        eventBus.invokeSubscriber(pendingPost);
                    }
                } catch (InterruptedException e) {
                    eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
                }
            } finally {
                executorRunning = false;
            }
        }
    
    }
    

    AsyncPoster 这个主要是异步的,来一个执行一个

    class AsyncPoster implements Runnable, Poster {
    
        private final PendingPostQueue queue;
        private final EventBus eventBus;
    
        AsyncPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
    s
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            queue.enqueue(pendingPost);
            eventBus.getExecutorService().execute(this);
        }
    
        @Override
        public void run() {
            PendingPost pendingPost = queue.poll();
            if(pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            }
            eventBus.invokeSubscriber(pendingPost);
        }
    
    }
    

    接下来就是重要的 SubscriberMethodFinder 函数,

            class SubscriberMethodFinder {}
    
            SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification,
                               boolean ignoreGeneratedIndex) {
            this.subscriberInfoIndexes = subscriberInfoIndexes;
            this.strictMethodVerification = strictMethodVerification;
            this.ignoreGeneratedIndex = ignoreGeneratedIndex;
        }
    
    

    熟悉的register函数,首先获取到subscriber对象,一般我们在Activity里面,所以是Activity对象,然后调用subscriberMethodFinder.findSubscriberMethods(subscriberClass),找到所有订阅的Methods,然后绑定订阅

        /**
         * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
         * are no longer interested in receiving events.
         * <p/>
         * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
         * The {@link Subscribe} annotation also allows configuration like {@link
         * ThreadMode} and priority.
         */
        public void register(Object subscriber) {
            Class<?> subscriberClass = subscriber.getClass();
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
    

    找到某个类中,也就是注册的类中的所有的订阅方法,首先查看方法缓存中是否以及存在,存在则直接返回,不存在则使用反射或者Info去找,获取到所有方法后把它存入缓存,返回所有的方法列表。注意getMethods和getDeclaredMethods的区别,一个是能够获取包括父类多有的public方法,一个是获取本类所有的包括私有的方法,

        List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
            List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
            if (subscriberMethods != null) {
                return subscriberMethods;
            }
    
            if (ignoreGeneratedIndex) {
                subscriberMethods = findUsingReflection(subscriberClass);
            } else {
                subscriberMethods = findUsingInfo(subscriberClass);
            }
            if (subscriberMethods.isEmpty()) {
                throw new EventBusException("Subscriber " + subscriberClass
                        + " and its super classes have no public methods with the @Subscribe annotation");
            } else {
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
                return subscriberMethods;
            }
        }
    

    查找函数,首先prepareFindState,初始化 FindState,用于存放各种订阅相关的数据

        private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
            FindState findState = prepareFindState();
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
                findUsingReflectionInSingleClass(findState);
                findState.moveToSuperclass();
            }
            return getMethodsAndRelease(findState);
        }
        // 
        private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
                // This is faster than getMethods, especially when subscribers are fat classes like Activities
                methods = findState.clazz.getDeclaredMethods();
            } catch (Throwable th) {
                // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
                methods = findState.clazz.getMethods();
                findState.skipSuperClasses = true;
            }
            // 对于每一个方法,获取信息,首先
            // 校验修饰符,过滤掉非 Public 的方法
            // 以及 abstract,static,bridge or synthetic methods,
            for (Method method : methods) {
                int modifiers = method.getModifiers();
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    // 参数类型只能为一个
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length == 1) {
                        // 获取注解
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        if (subscribeAnnotation != null) {
                            // 参数类型
                            Class<?> eventType = parameterTypes[0];
                            if (findState.checkAdd(method, eventType)) {
                                // 注解的值
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                // 添加到findState
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
        }
    
        // 初始化 FindState
        private FindState prepareFindState() {
            synchronized (FIND_STATE_POOL) {
                for (int i = 0; i < POOL_SIZE; i++) {
                    FindState state = FIND_STATE_POOL[i];
                    if (state != null) {
                        FIND_STATE_POOL[i] = null;
                        return state;
                    }
                }
            }
            return new FindState();
        }
    
        static class FindState {
            final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
            final Map<Class, Object> anyMethodByEventType = new HashMap<>();
            final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
            final StringBuilder methodKeyBuilder = new StringBuilder(128);
    
            Class<?> subscriberClass;
            Class<?> clazz;
            boolean skipSuperClasses;
            SubscriberInfo subscriberInfo;
    
            void initForSubscriber(Class<?> subscriberClass) {
                this.subscriberClass = clazz = subscriberClass;
                skipSuperClasses = false;
                subscriberInfo = null;
            }
    
            void recycle() {
                subscriberMethods.clear();
                anyMethodByEventType.clear();
                subscriberClassByMethodKey.clear();
                methodKeyBuilder.setLength(0);
                subscriberClass = null;
                clazz = null;
                skipSuperClasses = false;
                subscriberInfo = null;
            }
    
            boolean checkAdd(Method method, Class<?> eventType) {
                // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
                // Usually a subscriber doesn't have methods listening to the same event type.
                Object existing = anyMethodByEventType.put(eventType, method);
                if (existing == null) {
                    return true;
                } else {
                    if (existing instanceof Method) {
                        if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                            // Paranoia check
                            throw new IllegalStateException();
                        }
                        // Put any non-Method object to "consume" the existing Method
                        anyMethodByEventType.put(eventType, this);
                    }
                    return checkAddWithMethodSignature(method, eventType);
                }
            }
    
            private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
                methodKeyBuilder.setLength(0);
                methodKeyBuilder.append(method.getName());
                methodKeyBuilder.append('>').append(eventType.getName());
    
                String methodKey = methodKeyBuilder.toString();
                Class<?> methodClass = method.getDeclaringClass();
                Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
                if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                    // Only add if not already found in a sub class
                    return true;
                } else {
                    // Revert the put, old class is further down the class hierarchy
                    subscriberClassByMethodKey.put(methodKey, methodClassOld);
                    return false;
                }
            }
    
            void moveToSuperclass() {
                if (skipSuperClasses) {
                    clazz = null;
                } else {
                    clazz = clazz.getSuperclass();
                    String clazzName = clazz.getName();
                    /** Skip system classes, this just degrades performance. */
                    if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
                        clazz = null;
                    }
                }
            }
        }
    

    有必要看一下,注解的定义

    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD})
    public @interface Subscribe {
        ThreadMode threadMode() default ThreadMode.POSTING;
    
        /**
         * If true, delivers the most recent sticky event (posted with
         * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
         */
        boolean sticky() default false;
    
        /** Subscriber priority to influence the order of event delivery.
         * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
         * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
         * delivery among subscribers with different {@link ThreadMode}s! */
        int priority() default 0;
    }
    

    然后就是 subscribe,对于每一个方法,执行 subscribe 函数,将这些方法添加到subscriptionsByEventType消息类型分组的订阅清单,typesBySubscriber消息类型列表

        // Must be called in synchronized block
        private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType;
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            // 首次为空
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
            // 会根据优先级加入到订阅的列表中
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
    
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            // eventType 就是我们自定义的MeesageEvent,用于传递消息的
            subscribedEvents.add(eventType);
    
            if (subscriberMethod.sticky) {
                if (eventInheritance) {
                    // Existing sticky events of all subclasses of eventType have to be considered.
                    // Note: Iterating over all events may be inefficient with lots of sticky events,
                    // thus data structure should be changed to allow a more efficient lookup
                    // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            Object stickyEvent = entry.getValue();
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    Object stickyEvent = stickyEvents.get(eventType);
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
    

    Post 发送消息,首先从当前线程获取PostingThreadState,然后入队,再出队执行

        /** Posts the given event to the event bus. */
        public void post(Object event) {
            PostingThreadState postingState = currentPostingThreadState.get();
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);
    
            if (!postingState.isPosting) {
                postingState.isMainThread = isMainThread();
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
                    while (!eventQueue.isEmpty()) {
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    

    eventInheritance 默认为true,允许事件继承,首先获取EventClass所有的父类和接口,并调用postSingleEventForEventType执行

        private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            if (eventInheritance) {
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
    

    获取所有的方法,包括父类和接口,加入到需要通知的事件类型

        /** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
        private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
            synchronized (eventTypesCache) {
                List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
                if (eventTypes == null) {
                    eventTypes = new ArrayList<>();
                    Class<?> clazz = eventClass;
                    while (clazz != null) {
                        eventTypes.add(clazz);
                        addInterfaces(eventTypes, clazz.getInterfaces());
                        clazz = clazz.getSuperclass();
                    }
                    eventTypesCache.put(eventClass, eventTypes);
                }
                return eventTypes;
            }
        }
    
        /** Recurses through super interfaces. */
        static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
            for (Class<?> interfaceClass : interfaces) {
                if (!eventTypes.contains(interfaceClass)) {
                    eventTypes.add(interfaceClass);
                    addInterfaces(eventTypes, interfaceClass.getInterfaces());
                }
            }
        }
    

    首先通过subscriptionsByEventType.get(eventClass)获取能够接受这个数据类型的所有方法,然后根据每个方法的参数类型进行调用

        private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
            CopyOnWriteArrayList<Subscription> subscriptions;
            synchronized (this) {
                subscriptions = subscriptionsByEventType.get(eventClass);
            }
            if (subscriptions != null && !subscriptions.isEmpty()) {
                for (Subscription subscription : subscriptions) {
                    postingState.event = event;
                    postingState.subscription = subscription;
                    boolean aborted = false;
                    try {
                        postToSubscription(subscription, event, postingState.isMainThread);
                        aborted = postingState.canceled;
                    } finally {
                        postingState.event = null;
                        postingState.subscription = null;
                        postingState.canceled = false;
                    }
                    if (aborted) {
                        break;
                    }
                }
                return true;
            }
            return false;
        }
    

    POSTING模式直接调用,与当前线程一致
    MAIN模式,在主线程调用,则需要获取当前线程是否是主线程,若不是则入主线程队列
    MAIN_ORDERED模式,如果mainThreadPoster不为空,则入队,否则直接调用
    BACKGROUND模式,若在子线程,则直接调用,否则如后台线程队列
    ASYNC模式,直接如异步队列

        private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    
    

    直接调用,在post的执行的那个线程,比如说你在子线程调用post,那么isMainThread就是false,则会到mainThreadPoster.enqueue()里面,让主线程去调用,否则直接调用

        void invokeSubscriber(Subscription subscription, Object event) {
            try {
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
        }
    

    注销,首先获取需要注销的事件类型,然后针对每一个类型,取消订阅者,最后移除

        /** Unregisters the given subscriber from all event classes. */
        public synchronized void unregister(Object subscriber) {
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                for (Class<?> eventType : subscribedTypes) {
                    unsubscribeByEventType(subscriber, eventType);
                }
                typesBySubscriber.remove(subscriber);
            } else {
                logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
        }
        /** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
        private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
            List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions != null) {
                int size = subscriptions.size();
                for (int i = 0; i < size; i++) {
                    Subscription subscription = subscriptions.get(i);
                    if (subscription.subscriber == subscriber) {
                        subscription.active = false;
                        subscriptions.remove(i);
                        i--;
                        size--;
                    }
                }
            }
        }
    
    

    在反射找方法的时候,如果使用索引加速的话,调用了这个方法

        private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
            FindState findState = prepareFindState();
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
                findState.subscriberInfo = getSubscriberInfo(findState);
                if (findState.subscriberInfo != null) {
                    SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                    for (SubscriberMethod subscriberMethod : array) {
                        if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                            findState.subscriberMethods.add(subscriberMethod);
                        }
                    }
                } else {
                    findUsingReflectionInSingleClass(findState);
                }
                findState.moveToSuperclass();
            }
            return getMethodsAndRelease(findState);
        }
        // 获取 Subscriber的信息
        private SubscriberInfo getSubscriberInfo(FindState findState) {
            if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
                SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
                if (findState.clazz == superclassInfo.getSubscriberClass()) {
                    return superclassInfo;
                }
            }
            if (subscriberInfoIndexes != null) {
                for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                    SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                    if (info != null) {
                        return info;
                    }
                }
            }
            return null;
        }
    

    大概分析的差不多,但感觉还是一头雾水,对一些细节还云里雾里,先写一个小结吧

    小结

    使用了注解,反射,Handler,线程池,事件池,而且里面有很多优秀的设计,单例,建造者,工厂,要多读多练才能体味里面的精妙
    欢迎讨论

    相关文章

      网友评论

        本文标题:Android EventBus 源码解析

        本文链接:https://www.haomeiwen.com/subject/slzxiftx.html