EventBus(3.1.1)源码浅析

作者: Gillben | 来源:发表于2018-05-06 09:56 被阅读0次

    前言

    EventBus是Android的发布/订阅事件总线,简化了Android的事件传递。提高了代码的简洁性。

    官方图.png
    1、EventBus的简单使用
    public class MainActivity extends BaseActivity {
    
        @Override
        protected void onCreate(@Nullable Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            //注册
            EventBus.getDefault().register(this);
        }
        
        @Subscribe(threadMode = ThreadMode.MAIN)
        public void receiveEventBusMessage(EventBusMessage message){
            Log.e("receive", "receiveEventBusMessage: "+message.getText());
        }
    
        @Override
        protected void onDestroy() {
            super.onDestroy();
            //解注册
            EventBus.getDefault().unregister(this);     
        }   
    }
    
    //可以在其它组件、fragment或工作线程中发送事件
    private void sendMessage(){
            EventBus.getDefault().post(new EventBusMessage("消息:"+ new Random().nextInt(50)));
        }
    

    从上面代码可以发现,EventBus实现事件传递有多简洁,无任何花哨。首先,通过register()进行注册,这是使用EventBus的前提;然后声明一个订阅方法用于事件的接收(利用@Subscribe注解),threadMode指定线程模式(在这里指定于主线程),其中EventBusMessage是自己定义一个事件类;再然后通过在其它组件、Fragment或工作线程中使用post()发送事件,这样就完成了事件的传递。当组件销毁或者EventBus不再使用时千万别忘记了unregister()解注册。这很重要!!!

    2、经过上面的一个简单分析,初步的了解了EventBus的使用。那么,看似简单的几个步骤,里面究竟蕴含了什么内功心法或者招式呢?(故事说起来就长了)前排准备瓜子顺带点小酒。接下来从EventBus.getDefault()开始:
     public static EventBus getDefault() {
            if (defaultInstance == null) {
                synchronized (EventBus.class) {
                    if (defaultInstance == null) {
                        defaultInstance = new EventBus();
                    }
                }
            }
            return defaultInstance;
        }
    
    //EventBusBuilder以建造者模式初始化一以下属性
    EventBus(EventBusBuilder builder) {
            logger = builder.getLogger();
            subscriptionsByEventType = new HashMap<>();    //订阅的事件类型
            typesBySubscriber = new HashMap<>();           //订阅类型
            stickyEvents = new ConcurrentHashMap<>();      //粘性事件
            mainThreadSupport = builder.getMainThreadSupport(); //主线程接口类
            mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
            backgroundPoster = new BackgroundPoster(this); //在后台提交的事件
            asyncPoster = new AsyncPoster(this);
            indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
            //查找订阅方法的类
            subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                    builder.strictMethodVerification, builder.ignoreGeneratedIndex);
            logSubscriberExceptions = builder.logSubscriberExceptions;
            logNoSubscriberMessages = builder.logNoSubscriberMessages;
            sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
            sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
            throwSubscriberException = builder.throwSubscriberException;
            eventInheritance = builder.eventInheritance;
            //获取一个线程池
            executorService = builder.executorService;
        }
    

    很清晰,EventBus.getDefault()利用了单例模式中的双重校验方式获取一个EventBus实例。在构造函数最后一步通过Executors.newCachedThreadPool()创建一个线程池(主要在后台任务或异步任务时使用),对于线程池的创建可以了解这篇文章【探索Java 线程池

    3、EventBus的注册过程(register)。
    public void register(Object subscriber) {
            Class<?> subscriberClass = subscriber.getClass();
            //获取一个订阅方法List,包含了所订阅的类声明的订阅方法
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
    

    在上面的代码中可以知道,做了两件事,
    1、查找订阅的方法,SubscriberMethodFinder # findSubscriberMethods()
    2、进行订阅。subscribe()

    3.1、是如何查找已声明的订阅方法?进入SubscriberMethodFinder类的indSubscriberMethods()方法

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
           //1
            List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
            if (subscriberMethods != null) {
                return subscriberMethods;
            }
    
            if (ignoreGeneratedIndex) { //2
                subscriberMethods = findUsingReflection(subscriberClass);
            } else {
                subscriberMethods = findUsingInfo(subscriberClass);
            }
            if (subscriberMethods.isEmpty()) {
                throw new EventBusException("Subscriber " + subscriberClass
                        + " and its super classes have no public methods with the @Subscribe annotation");
            } else { //3
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
                return subscriberMethods;
            }
        }
    

    在上面代码1处,首先会从缓存中查找,subscriberClass正是调用register()注册的订阅者,METHOD_CACHE是一个ConcurrentHashMap的实例,从这里可以知道,subscriberClass作为ConcurrentHashMap的key,而ConcurrentHashMap的value(List<SubscriberMethod>)存放的是当前订阅者中声明的订阅方法。注释2处表示是否忽略索引的位置,默认为false,因此会在通过findUsingInfo()查找,注释3处把找到的方法保存到METHOD_CACHE中。一起研究findUsingInfo()方法:

     private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
            //从对象池中寻找FindState实例(数组的形式实现对象池)
            FindState findState = prepareFindState();
            //初始化需要寻找的方法的所属类
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
              //找到后,获取订阅者的信息 
                findState.subscriberInfo = getSubscriberInfo(findState);
               //开始保存获取的订阅方法
                if (findState.subscriberInfo != null) {
                    SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                    for (SubscriberMethod subscriberMethod : array) {
                        if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                            findState.subscriberMethods.add(subscriberMethod);
                        }
                    }
                } else {
                    //如果上面没有找到,则通过反射、注解的方式寻找
                    findUsingReflectionInSingleClass(findState);
                }
                //移动到父类
                findState.moveToSuperclass();
            }
            return getMethodsAndRelease(findState); //找到方法后释放findState
        }
    

    经过上面代码注释,对寻找订阅方法的步骤已经很清晰了,下面贴出getSubscriberInfo()和findUsingReflectionInSingleClass()两个方法:

     private SubscriberInfo getSubscriberInfo(FindState findState) {
            if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
                SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
                //判断是否是需要寻找的具体类
                if (findState.clazz == superclassInfo.getSubscriberClass()) {
                    return superclassInfo;
                }
            }
            //通过订阅信息索引进行查找
            if (subscriberInfoIndexes != null) {
                for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                    SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                    if (info != null) {
                        return info;
                    }
                }
            }
            return null;
        }
    
    private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
                // This is faster than getMethods, especially when subscribers are fat classes like Activities
                methods = findState.clazz.getDeclaredMethods();
            } catch (Throwable th) {
                // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
                methods = findState.clazz.getMethods();
                findState.skipSuperClasses = true;
            }
          //遍历订阅者中的所有方法
            for (Method method : methods) {
                int modifiers = method.getModifiers();
                //对订阅方法的作用域进行判断
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length == 1) {
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        if (subscribeAnnotation != null) {
                            Class<?> eventType = parameterTypes[0];
                            if (findState.checkAdd(method, eventType)) {
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
        }
    

    3.2、是如何进行订阅的?subscribe()

    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType;
    
            //通过订阅者和订阅方法创建一个订阅事件
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    
            //根据eventType从缓存中获取subscriptions
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    
            //若缓存中没有,则添加到缓存
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
    
             //遍历订阅事件,根据优先级重新排序
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
          
            //这里根据订阅者查找EventType缓存
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            subscribedEvents.add(eventType);
            
            //对粘性进行处理
            if (subscriberMethod.sticky) {
                if (eventInheritance) {      //同时考虑子类
                    // Existing sticky events of all subclasses of eventType have to be considered.
                    // Note: Iterating over all events may be inefficient with lots of sticky events,
                    // thus data structure should be changed to allow a more efficient lookup
                    // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            Object stickyEvent = entry.getValue();
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    Object stickyEvent = stickyEvents.get(eventType);
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
    

    上面的代码,就是整个的订阅过程了,比较关键的两个东西subscriptionsByEventType和typesBySubscriber,首先把订阅者和订阅方法封装进subscriptionsByEventType,当post事件时根据eventType查找具体的订阅者,然后进行处理。而typesBySubscriber存放的订阅者,在解注册时根据订阅者查找对应的eventType。在最后,会判断是否是粘性事件,如果是,则立刻进行处理。

      private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
            if (stickyEvent != null) {    
                //post事件是最终也调用这个方法,所以在下一步进行分析
                postToSubscription(newSubscription, stickyEvent, isMainThread());
            }
        }
    
    4、发送事件 post()
    public void post(Object event) {
            //每个线程都有一个提交事件的状态
            PostingThreadState postingState = currentPostingThreadState.get();
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);    //把提交的事件对象添加到eventQueue
    
            if (!postingState.isPosting) {
                postingState.isMainThread = isMainThread();
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                // 2
                try {
                    while (!eventQueue.isEmpty()) {
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    

    从上面代码可以发现,首先根据PostingThreadState确定当前线程的提交状态,从PostingThreadState获取eventQueue,用于存放提交的事件;然后会在注释2处,对事件进行遍历,直到eventQueue内部元素为空。

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            //考虑子类
            if (eventInheritance) {
                //查找所有的EventType
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
    
                //遍历,对eventTypes内部逐一处理
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
    

    在上面的代码中,先查找出所有的EventType,然后通过postSingleEventForEventType()逐个逐个的处理,postSingleEventForEventType()内部又调用了postToSubscription()。

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    分析到这里,可以知道,会根据在最初声明的订阅方法中标识的threadMode进行对应处理,这里就选择文章第一步中的MAIN标识进行分析。
    1、isMainThread为true,表示在主线程中执行,直接调用invokeSubscriber()。
    2、若不在主线程执行,调用mainThreadPoster.enqueue(),mainThreadPoster实际是HandlerPoster(继承了Handler)类对象的引用。

    //HandlerPoster # enqueue()
     public void enqueue(Subscription subscription, Object event) {
            //把订阅者和提交的事件封装在pendingPost
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                //内部利用了单向链表的方式进行摆放
                queue.enqueue(pendingPost);
                if (!handlerActive) {
                    handlerActive = true;
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
    
        //HandlerPoster # handleMessage()
        @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                handlerActive = rescheduled;
            }
        }
    

    根据上面HandlerPoster中的两段代码发现,最终是以Handler消息机制的方式进行处理事件,其中HandlerPoster的Looper是在初始化EventBus的时候获取了主线程中的Looper对象,在MainThreadSupport的实现类AndroidHandlerMainThreadSupport中。最后在handleMessage()中轮询queue,queue是PendingPostQueue的一个实例。调用poll()获取pendingPost,再调用eventBus.invokeSubscriber()进行处理。

     synchronized PendingPost poll() {
            PendingPost pendingPost = head;
            //从链表头部head开始,取完后就移除,下一个链节点作为head,如此循环
            if (head != null) {
                head = head.next;
                if (head == null) {
                    tail = null;
                }
            }
            return pendingPost;
        }
    
    //从链表中获取消息后的处理
    void invokeSubscriber(PendingPost pendingPost) {
            Object event = pendingPost.event;
            Subscription subscription = pendingPost.subscription;
            PendingPost.releasePendingPost(pendingPost);  //释放PendingPost
            if (subscription.active) {
                invokeSubscriber(subscription, event);
            }
        }
    

    在(threadMode == MAIN)时,经过上面的分析可以知道,不管是在主线程调用,还是工作线程中(通过Handler机制处理消息),都是调用了EventBus # invokeSubscriber(Subscription subscription, Object event) 这个方法。

        void invokeSubscriber(Subscription subscription, Object event) {
            try {
                //通过反射进行处理
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
        }
    
    5、解注册unregister()
    public synchronized void unregister(Object subscriber) {
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                for (Class<?> eventType : subscribedTypes) {
                    unsubscribeByEventType(subscriber, eventType);
                }
                typesBySubscriber.remove(subscriber);
            } else {
                logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
        }
    
    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
            List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions != null) {
                int size = subscriptions.size();
                for (int i = 0; i < size; i++) {
                    Subscription subscription = subscriptions.get(i);
                    if (subscription.subscriber == subscriber) {
                        subscription.active = false;
                        subscriptions.remove(i);
                        i--;
                        size--;
                    }
                }
            }
        }
    

    解注册的过程就比较简单了,还记得提交事件时讲到的typesBySubscriber吗?里面以键值对的形式存储subscriber和eventType的List。因此,在这根据订阅者subscriber找到eventType的List,然后遍历,进行释放。
    完篇

    相关文章

      网友评论

        本文标题:EventBus(3.1.1)源码浅析

        本文链接:https://www.haomeiwen.com/subject/psujrftx.html