EventBus源码解析

作者: 可乐游侠 | 来源:发表于2020-03-23 17:17 被阅读0次

    ​ EventBus是Android平台上一个发布/订阅事件总线,使用EventBus可以方便的在不同的组件中进行消息通信,避免不同组件之间的耦合。EventBus或者类似的事件总线基本上是各个项目中的标配。本文主要基于3.2.0版本介绍EventBus的实现方式。

    EventBus的基本流程

    EventBus-Publish-Subscribe.png

    ​ 如上图所示,EventBus的核心架构是通过post()方法把Event交给EventBus,由EventBus根据事件的类型,分发给Subscriber。Subscriber即订阅者,指的是通过EventBus的register()方法在EventBus中注册的对象,可以是Activity、Fragment也可以是其它对象。

    ​ EventBus的流程比较简单,主要分析几个问题。

    1. EventBus的注册流程
    2. EventBus分发事件的流程
    3. EventBus怎么切换线程

    EventBus如何存储订阅关系

    ​ 在开始之前,先介绍一下EventBus存储订阅关系的几个Map,了解几个Map存储的数据和作用后,更容易理解EventBus的流程。

    1. subscriptionsByEventType:
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    

    变量声明如上所示,是一个保存Event类型和对应的订阅对象和方法的Map,key对应的是Event的Class对象,value是对应的Subscription的列表。subscriptionsByEventType的作用是根据Event的类型,拿到对应的订阅者列表及其方法,然后通过反射的形式进行调用。简单来说,subscriptionsByEventType保存的是某个Event有哪些订阅者。

    final class Subscription {
        final Object subscriber;
        final SubscriberMethod subscriberMethod;
            ...
    }
    public class SubscriberMethod {
        final Method method;
        final ThreadMode threadMode;
        final Class<?> eventType;
        final int priority;
        final boolean sticky;
        /** Used for efficient comparison */
        String methodString;
        ...
    }
    

    Subscription是表示订阅的实体类,从上面两个类的代码可以看出,Subscription用来保存订阅者以及对应的方法,包括方法的对象、线程、事件类型、优先级等等属性。

    1. typesBySubscriber:
    private final Map<Object, List<Class<?>>> typesBySubscriber;
    

    typesBySubscriber的key是Subscriber,value对应的是订阅的Event的列表。typesBySubscriber的作用是在注册成订阅者或者取消注册的时候,可以根据Subscriber拿到对应的Event列表,然后在subscriptionsByEventType中中方便的移除相应的订阅。与subscriptionsByEventType相反,typesBySubscriber保存的是某个订阅者,订阅了哪些Event。

    EventBus的注册流程

    ​ EventBus注册的方式很简单,通过register(this)和unregister(this)两个方法进行注册和反注册,通过注解的方式标识具体的方法及处理的线程等。

        public void register(Object subscriber) {
            Class<?> subscriberClass = subscriber.getClass();
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
    

    register()方法比较好理解,就是找到subscriber的订阅的方法,然后挨个进行注册。问题就在于如何找到订阅方法。

    遍历订阅者的方法

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
            List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
            if (subscriberMethods != null) {
                return subscriberMethods;
            }
    
            if (ignoreGeneratedIndex) {
                subscriberMethods = findUsingReflection(subscriberClass);
            } else {
                subscriberMethods = findUsingInfo(subscriberClass);
            }
            if (subscriberMethods.isEmpty()) {
                throw new EventBusException("Subscriber " + subscriberClass
                        + " and its super classes have no public methods with the @Subscribe annotation");
            } else {
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
                return subscriberMethods;
            }
        }
    

    在SubscriberMethodFinder中维护了缓存METHOD_CACHE,保存订阅者对应的订阅方法,提高再次订阅的效率。然后查找订阅者的方法。EventBus3.0中新增了索引机制,如果在gradle中配置了索引,则会在编译的时候就遍历订阅者的方法,降低EventBus注册占用的时间,提升EventBus的性能。

    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
            FindState findState = prepareFindState();
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
                findState.subscriberInfo = getSubscriberInfo(findState);
                if (findState.subscriberInfo != null) {
                    SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                    for (SubscriberMethod subscriberMethod : array) {
                        if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                            findState.subscriberMethods.add(subscriberMethod);
                        }
                    }
                } else {
                    findUsingReflectionInSingleClass(findState);
                }
                findState.moveToSuperclass();
            }
            return getMethodsAndRelease(findState);
        }
    

    通过内部类FindState来遍历方法,getSubscriberInfo(findState)方法通过SubscriberInfoIndex获取订阅者信息,如果配置了索引,就会获取到subscriberInfo,调用subscriberInfo.getSubscriberMethods()可以获取到订阅的方法数组。如果没有配置索引功能则返回null,会通过反射的方式去遍历方法。最后从findState获取订阅方法的List返回,并重置findState。

    private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
                // This is faster than getMethods, especially when subscribers are fat classes like Activities
                methods = findState.clazz.getDeclaredMethods();
            } catch (Throwable th) {
               ...
            }
            for (Method method : methods) {
                int modifiers = method.getModifiers();
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length == 1) {
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        if (subscribeAnnotation != null) {
                            Class<?> eventType = parameterTypes[0];
                            if (findState.checkAdd(method, eventType)) {
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
        }
    

    这里通过getDeclaredMethods拿到类的方法,根据方法的类型、参数、注解依次进行筛选,找到符合条件的方法,将方法的相关信息添加到findState中。

    遍历完订阅方法之后,对订阅方法进行注册。

    注册订阅方法

     private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType;
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
    
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
    
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            subscribedEvents.add(eventType);
    
            if (subscriberMethod.sticky) {
                if (eventInheritance) {
                    // Existing sticky events of all subclasses of eventType have to be considered.
                    // Note: Iterating over all events may be inefficient with lots of sticky events,
                    // thus data structure should be changed to allow a more efficient lookup
                    // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            Object stickyEvent = entry.getValue();
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    Object stickyEvent = stickyEvents.get(eventType);
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
    

    订阅的方法中根据subscriber和subscriberMethod构造了一个Subscription对象,然后根据优先级保存到subscriptionsByEventType中,在发送事件的时候取出依次分发。把subscriber和事件类型添加到typesBySubscriber中,方便取消注册。如果是方法设置粘性事件属性的话,就立刻分发现存的对应的粘性事件。

    分发事件的流程

        public void post(Object event) {
            PostingThreadState postingState = currentPostingThreadState.get();
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);
    
            if (!postingState.isPosting) {
                postingState.isMainThread = isMainThread();
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
                    while (!eventQueue.isEmpty()) {
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    

    把Event放进队列中,然后调用postSingleEvent()方法逐个分发。

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            if (eventInheritance) {
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
                    ...
        }
    

    调用lookupAllEventTypes()方法遍历Event的所有接口和父类,然后再调用postSingleEventForEventType()分发。通过subscriptionsByEventType拿到之前保存的事件的订阅关系列表,然后再调用postToSubscription()方法分发。

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    根据ThreadMode属性进行线程的切换,比如常用的MAIN模式,如果post的线程是主线程,就直接通过反射的方式调用订阅者的订阅方法。如果是在子线程中调用的post方法,就把这个订阅事件加入到mainThreadPoster的队列中等待处理,mainThreadPoster通过Handler的方式进行主线程切换。

    EventBus怎么切换线程

    1. 切换到主线程
    public class HandlerPoster extends Handler implements Poster {
    
        private final PendingPostQueue queue;
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!handlerActive) {
                    handlerActive = true;
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
    
        @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                ...
                while (true) {
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
    }
    

    切换到主线程由HandlerPoster类实现,在enqueue()方法之后,调用sendMessage触发handleMessage方法,在handleMessage中反射调用订阅方法。为了避免主线程占用时间过长,超过10ms会重新通过sendMessage方法重新安排剩下的反射调用。

    1. 切换到子线程
    class AsyncPoster implements Runnable, Poster {
    
        private final PendingPostQueue queue;
        private final EventBus eventBus;
    
        AsyncPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            queue.enqueue(pendingPost);
            eventBus.getExecutorService().execute(this);
        }
    
        @Override
        public void run() {
            PendingPost pendingPost = queue.poll();
            if(pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            }
            eventBus.invokeSubscriber(pendingPost);
        }
    
    }
    

    切换到子线程通过AsyncPoster和BackGroudPoster实现,原理是一样的,继承Runnable类,通过线程池执行。在run方法中反射调用订阅方法。

    相关文章

      网友评论

        本文标题:EventBus源码解析

        本文链接:https://www.haomeiwen.com/subject/fznwyhtx.html