美文网首页
EventBus 3.0 源码解析

EventBus 3.0 源码解析

作者: __hgb | 来源:发表于2018-01-07 17:47 被阅读0次

    前言

    EventBus是一个发布/订阅框架。用两个字来概括它,解耦。它简化了组件间事件传递,也可用于线程间事件传递。

    创建

    我们先从向eventbus注册成为订阅者这行代码开始,点进getDefault(),

    EventBus.getDefault().register(this);
    

    这里我们看到了double check的单例模式,这意味着我们在任意地方调用EventBus.getDefault()得到的都是同一个实例。

    public static EventBus getDefault() {
            if (defaultInstance == null) {
                synchronized (EventBus.class) {
                    if (defaultInstance == null) {
                        defaultInstance = new EventBus();
                    }
                }
            }
            return defaultInstance;
        }
    

    再来看看EventBus的构造方法,可以看到构造方法是public的,再看看第一个参数,
    EventBusBuilder ,在这里可以推测出我们可以通过构建者模式来构造不同的EventBus对象。而不同的EventBus对象之间是相互独立,相互隔离的。你在A对象中发布事件,向B对象注册成为订阅者是不可能收到事件的。

     public EventBus() {
            this(DEFAULT_BUILDER);
        }
    
      EventBus(EventBusBuilder builder) {
            logger = builder.getLogger();
            subscriptionsByEventType = new HashMap<>();
            typesBySubscriber = new HashMap<>();
            stickyEvents = new ConcurrentHashMap<>();
            mainThreadSupport = builder.getMainThreadSupport();
            mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
            backgroundPoster = new BackgroundPoster(this);
            asyncPoster = new AsyncPoster(this);
         //代码省略
    
    }
    

    我们来解释一下几个重要的成员变量
    subscriptionsByEventType:从命名来看,以EventType为key,subscriptions为
    value。subscription是一个包装类,包装了订阅者和订阅方法。注意CopyOnWriteArrayList是一个线程安全的容器。
    typesBySubscriber:从命名来看,以Subscriber为key,EventType为value。
    stickyEvents :存储了所有的粘性事件。
    这里实例化了三个 poster ,用于线程调度,分别是mainThreadPoster、 backgroundPoster、asyncPoster等。

    注册

    下面我们来分析注册的过程

    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        //找到所有的订阅者方法  
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
        //  遍历订阅者方法,订阅者逐一订阅方法
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    

    这里我们可以看到通过subscriberMethodFinde这个对象找到该类中所有的订阅方法,然后放入一个集合中,之后遍历集合,订阅者订阅每一个找到的方法.先来看看
    SubscriberMethodFinder#findSubscriberMethods方法。

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //判断缓存中有无subscriberMethods,有则直接返回
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        //是否忽略注解器生成的类
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
                   //从注解器生成的类中读取订阅类的订阅方法信息
                 //1. findUsingInfo
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //将找到的subscriberMethods放入缓存中
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
    

    从这里直接跟进SubscriberMethodFinder#finduserinfo

    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        //FindState保存了订阅者类的信息
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                //2、到了这里
                findUsingReflectionInSingleClass(findState);
            }
            //移动到父类继续查找
            findState.moveToSuperclass();
        }
        //3. getMethodsAndRelease(findState)
        return getMethodsAndRelease(findState);
    }
    

    findstate 保存了 订阅者类的信息,我们来看看Findstate类

    static class FindState {
        //所有订阅方法
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        //以event为key,method为value
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        //以method的名字生成一个methodkey,以订阅者类为value
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);
    
    }
    

    下面跟进SubscriberMethodFinder#findUsingReflectionInSingleClass(findstate)

        private void findUsingReflectionInSingleClass(FindState findState) {
       //省略代码
            for (Method method : methods) {
                int modifiers = method.getModifiers();
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                     //参数是否为1
                    if (parameterTypes.length == 1) {
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        //是否包含Subscribe注解
                        if (subscribeAnnotation != null) {
                            Class<?> eventType = parameterTypes[0];
                            //3.checkadd方法
                            if (findState.checkAdd(method, eventType)) {
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                         //把SubscriberMethod放入findsate的subscriberMethods成员变量中
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
        }
    

    注释里3.调用了SubscriberMethodFinder#checkadd方法,我们跟进来分析看看

    
       boolean checkAdd(Method method, Class<?> eventType) {
                // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
                // Usually a subscriber doesn't have methods listening to the same event type.
            //如果之前没有相同的EventType,existing 为null,则直接返回true
                Object existing = anyMethodByEventType.put(eventType, method);
                if (existing == null) {
                    return true;
                } else {
                    if (existing instanceof Method) {
                        if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                            // Paranoia check
                            throw new IllegalStateException();
                        }
                        // Put any non-Method object to "consume" the existing Method
                        anyMethodByEventType.put(eventType, this);
                    }
                    return checkAddWithMethodSignature(method, eventType);
                }
            }
    
      private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
                methodKeyBuilder.setLength(0);
                methodKeyBuilder.append(method.getName());
                methodKeyBuilder.append('>').append(eventType.getName());
                 //method由方法名和eventtype共同决定
                String methodKey = methodKeyBuilder.toString();
                Class<?> methodClass = method.getDeclaringClass();
                Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
                if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                    // Only add if not already found in a sub class
                    return true;
                } else {
                    // Revert the put, old class is further down the class hierarchy
                    //4.methodClassOld不为空
                    subscriberClassByMethodKey.put(methodKey, methodClassOld);
                    return false;
                }
            }
    

    这里我们看到注释4,methodClassOld不为空,说明集合中已经存在methodKey相同,即方法名和eventtype都相同的订阅方法。就把旧的订阅方法放入集合中。这说明如果子类重写了父类的订阅方法,则父类的订阅方法无效。

    我们在回到上面,在findUsingInfo方法的最后一行调用了SubscriberMethodFinder# getMethodsAndRelease方法

       private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
             //把findState.subscriberMethods赋值给subscriberMethods ,并回收findState,最后返回subscriberMethods
            List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
            findState.recycle();
            synchronized (FIND_STATE_POOL) {
                for (int i = 0; i < POOL_SIZE; i++) {
                    if (FIND_STATE_POOL[i] == null) {
                        FIND_STATE_POOL[i] = findState;
                        break;
                    }
                }
            }
            return subscriberMethods;
        }
    

    subscribe

     // Must be called in synchronized block
        private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType;
           //将订阅者,订阅方法封装进Subscription
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
               //将eventType, subscriptions保存进 subscriptionsByEventType集合中
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
    
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                //将优先级高的方法放在前面
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
    
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
              //将subscriber, subscribedEvents保存进 typesBySubscriber方法中
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            subscribedEvents.add(eventType);
            //省略代码
         
        }
    

    unregister

      public synchronized void unregister(Object subscriber) {
              //通过subscriber找到所有的订阅事件
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                for (Class<?> eventType : subscribedTypes) {
                    //遍历
                    unsubscribeByEventType(subscriber, eventType);
                }
                typesBySubscriber.remove(subscriber);
            } else {
                logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
        }
    
    
        private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
           //在subscriptionsByEventType中,找到所有的subscriptions
            List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions != null) {
                int size = subscriptions.size();
                for (int i = 0; i < size; i++) {
                    Subscription subscription = subscriptions.get(i);
                    //如果当前的subscriber与subscription中的subscriber相等,则移除集合中的元素
                    if (subscription.subscriber == subscriber) {
                        subscription.active = false;
                        subscriptions.remove(i);
                        i--;
                        size--;
                    }
                }
            }
        }
    

    发送事件

     EventBus.getDefault().post(new Message(message));
    

    先来看EventBus#post方法,

        public void post(Object event) {
            //1. PostingThreadState 
            PostingThreadState postingState = currentPostingThreadState.get();
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);
    
            if (!postingState.isPosting) {
                postingState.isMainThread = isMainThread();
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
                    while (!eventQueue.isEmpty()) {
                        //2. postSingleEvent
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    

    可以看到PostingThreadState 封装了当前信息,subscription,event等

      final static class PostingThreadState {
            final List<Object> eventQueue = new ArrayList<>();
            boolean isPosting;
            boolean isMainThread;
            Subscription subscription;
            Object event;
            boolean canceled;
        }
    

    而currentPostingThreadState又是什么呢?

     private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
            @Override
            protected PostingThreadState initialValue() {
                return new PostingThreadState();
            }
        };
    

    可以看到currentPostingThreadState是ThreadLocal类型的。ThreadLocal是一个线程内部的数据存储类,通过它在指定的线程中存储数据,只能在指定线程中获取。
    下面看 EventBus#postSingleEvent

     private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
           //是否要考虑事件的父类
            if (eventInheritance) {
             //查找事件的所有父类
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                 //调用postSingleEventForEventType
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
    
     private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
            CopyOnWriteArrayList<Subscription> subscriptions;
            synchronized (this) {
            //在subscriptionsByEventType集合中找到所有的subscriptions
                subscriptions = subscriptionsByEventType.get(eventClass);
            }
            if (subscriptions != null && !subscriptions.isEmpty()) {
                for (Subscription subscription : subscriptions) {
                    postingState.event = event;
                    postingState.subscription = subscription;
                    boolean aborted = false;
                    try {
                       //调用postToSubscription
                        postToSubscription(subscription, event, postingState.isMainThread);
                        aborted = postingState.canceled;
                    } finally {
                        postingState.event = null;
                        postingState.subscription = null;
                        postingState.canceled = false;
                    }
                    if (aborted) {
                        break;
                    }
                }
                return true;
            }
            return false;
        }
    
        private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
          //根据threadMode来判断
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                 //当前是否是主线程,
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                         //如果发布事件是在主线程,就用 backgroundPoster切换到后台
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    //不管当前发布事件在哪个进程,都用asyncPoster新建一个线程来处理
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    这里来讲解一下这三个Poster.

    1. handlerPoster继承了Handler,获取主线程的Looper。所以handleMessage方法在主线程中执行。在handleMessage方法中,开启了循环从队列中获取数据,调用 invokeSubscriber方法分发事件
      2.BackGroundPoster实现了Runnable接口。从队列中获取数据进行分发,直到取完为止。
      3.AsycPoster实现了Runnable接口。与BackGroundPoster不一样的是它只获取队列中的一个数据,然后进行分发。

    相关文章

      网友评论

          本文标题:EventBus 3.0 源码解析

          本文链接:https://www.haomeiwen.com/subject/wixunxtx.html