EventBus源码分析

作者: KuTear | 来源:发表于2016-08-22 23:12 被阅读113次

    本文发表于KuTear's Blog,转载请注明

    最简单的例子说起

    先从一个简单的栗子出发,看看EventBus的功能是什么。

    @Override
    protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            EventBus.getDefault().register(this);
            findViewById(R.id.say_hello).setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View view) {
                    EventBus.getDefault().post(new EventBean(1,"Hello"));
                }
            });
    }
    
    @Subscribe(threadMode = ThreadMode.MAIN) //在ui线程执行
    public void onEvent(EventBean event) {
            System.out.println(Thread.currentThread().getName());
    }
    
    @Override
    protected void onDestroy() {
            super.onDestroy();
            EventBus.getDefault().unregister(this);
    }
    

    上面的代码是最简单的一个事件,当点击按钮之后回调onEvent()方法。下面就着重看看这个过程的实现。类似的代码我们见得很多,比如App存在一个UserManager,有一个用户状态的分发,很多类在这里注册了用户状态的监听回调,当用户登陆,所有的注册了监听的类都会收到这个消息。其实EventBus的实现也是类似的,只是不存在接口。
    看看上面的代码,我们可能会对是怎样回调onEvent()感到一丝的困惑。下面进入源码的世界。

    EventBus 源码分析

    先看看一些有用的字段

        //K->方法参数的类型   V->K的所有父类的结合(包括本身) 用作缓存
        private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();
        //K->方法参数的类型   V->所有参数类型的K的订阅函数的集合  主要是消息发送使用
        private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
        //K->注册的类,如Activity  V-> 注册类的注册函数的参数的集合  主要是注册/解绑使用
        private final Map<Object, List<Class<?>>> typesBySubscriber;
        //粘性事件 K->发出的事件的参数类型  V->事件的值
        private final Map<Class<?>, Object> stickyEvents;
    

    一切的开始-事件的订阅

    //EventBus
    public void register(Object subscriber) {
            Class<?> subscriberClass = subscriber.getClass();
            //获取该类的所有的能接受事件的函数,也就是上面说的`onEvent(...)`
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
    }
    

    如何才能找到注册的方法呢,这就要看看SubscriberMethodFinder的具体实现了.

    //SubscriberMethodFinder
    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
            //提升速度,优先重缓存中取,支持并发操作
            //声明为 Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
            List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
            if (subscriberMethods != null) {
                return subscriberMethods;
            }
            if (ignoreGeneratedIndex) { //默认为false
                subscriberMethods = findUsingReflection(subscriberClass);
            } else {
                subscriberMethods = findUsingInfo(subscriberClass);
            }
    
            if (subscriberMethods.isEmpty()) {
                throw new EventBusException("Subscriber " + subscriberClass
                        + " and its super classes have no public methods with the @Subscribe annotation");
            } else {
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
                return subscriberMethods;
            }
    }
    
    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
            //从复用池中取回一个FindState
            FindState findState = prepareFindState();
            //为findState设置clazz等参数
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
                findState.subscriberInfo = getSubscriberInfo(findState);
                //通常为NULL
                if (findState.subscriberInfo != null) {
                    SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                    for (SubscriberMethod subscriberMethod : array) {
                        if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                            findState.subscriberMethods.add(subscriberMethod);
                        }
                    }
                } else {
                    findUsingReflectionInSingleClass(findState);
                }
                //设置findState.clazz为刚刚clazz的父类
                findState.moveToSuperclass();
            }
            //获取findState.subscriberMethods
            return getMethodsAndRelease(findState);
    }
    

    再看findUsingReflectionInSingleClass()之前,线看看FindState的一部分实现

    //FindState
    static class FindState {
        //订阅者的方法的列表
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        //以EventType为key,method为value
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        //以method的名字生成一个methodKey为key,该method的类(订阅者)为value
            final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
            //构建methodKey的StringBuilder
        final StringBuilder methodKeyBuilder = new StringBuilder(128);
        //订阅者
        Class<?> subscriberClass;
            //当前类
        Class<?> clazz;
            //是否跳过父类
        boolean skipSuperClasses;
           //SubscriberInfo
        SubscriberInfo subscriberInfo;
    
        void initForSubscriber(Class<?> subscriberClass) {
            //clazz为当前类
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }
        
            boolean checkAdd(Method method, Class<?> eventType) { //带检测方法,和他的参数类型
                // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
                // Usually a subscriber doesn't have methods listening to the same event type.
                Object existing = anyMethodByEventType.put(eventType, method);
                if (existing == null) {
                    return true;
                } else {
                    if (existing instanceof Method) {
                        if (!checkAddWithMethodSignature((Method) existing, eventType)) { //检测函数的签名
                            // Paranoia check
                            throw new IllegalStateException();
                        }
                        // Put any non-Method object to "consume" the existing Method
                        anyMethodByEventType.put(eventType, this);
                    }
                    return checkAddWithMethodSignature(method, eventType);
                }
           }
        
           private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
                methodKeyBuilder.setLength(0);
                methodKeyBuilder.append(method.getName());
                methodKeyBuilder.append('>').append(eventType.getName());
    
                String methodKey = methodKeyBuilder.toString();
                Class<?> methodClass = method.getDeclaringClass();  //找到声明该方法的类
                Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
                if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                    //判断old是否为methodClass的父类
                    // Only add if not already found in a sub class
                    return true;
                } else {
                    // Revert the put, old class is further down the class hierarchy
                    subscriberClassByMethodKey.put(methodKey, methodClassOld);
                    return false;
                }
            }
    }
    

    Ok,下面看看findUsingReflectionInSingleClass()的实现吧.

    private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
                // This is faster than getMethods, especially when subscribers are fat classes like Activities
                methods = findState.clazz.getDeclaredMethods();//返回该类的所有方法,但是不包括继承得来的
            } catch (Throwable th) {
                // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
                methods = findState.clazz.getMethods();
                findState.skipSuperClasses = true;
            }
            for (Method method : methods) {
                int modifiers = method.getModifiers(); //方法的修饰符 public|protected|private|default(package)
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { //大体上就是要求public,非static|abstract
                    Class<?>[] parameterTypes = method.getParameterTypes(); //获取参数类型数组
                    if (parameterTypes.length == 1) {  //只允许有一个参数
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        if (subscribeAnnotation != null) { //函数必须包含注解
                            Class<?> eventType = parameterTypes[0];
                            if (findState.checkAdd(method, eventType)) {
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
    }
    
    

    现在函数调用栈退回到了最开始的register(),接着看subscribe(...)方法。

    // Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType; //参数类型
            //将注册类和方法打包为Subscription
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                //同一个事件不再多次注册,所以每次使用后一定要解绑,不解绑还会内存泄露
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
    
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                //根据优先级放入合适的位置
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
            //typesBySubscriber  K->注册的类,如Activity  V-> 注册类的注册函数的参数的集合
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            subscribedEvents.add(eventType);
            //注册的时候判断是否有粘性事件,如有就执行咯
            if (subscriberMethod.sticky) {
                if (eventInheritance) {  //Default true
                    // Existing sticky events of all subclasses of eventType have to be considered.
                    // Note: Iterating over all events may be inefficient with lots of sticky events,
                    // thus data structure should be changed to allow a more efficient lookup
                    // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        //判断eventType是否为candidateEventType的父类,即所有的子类都能收到消息
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            Object stickyEvent = entry.getValue();
                            //发送消息,这里先不讲,在后面也会说的
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    Object stickyEvent = stickyEvents.get(eventType);
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
    }
    

    上面注册的过程基本已经说完,下面将讲事件的发送过程。根据最上面的栗子,我们知道入口是post()函数,下面就入手post()函数。

    事件的发送

    //保证每个线程取得的PostingThreadState不同,但是相同线程取得的相同,本质就是HashMap<Thread,PostingThreadState>
    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
            @Override
            protected PostingThreadState initialValue() {
                return new PostingThreadState();
            }
    };
    
    final static class PostingThreadState {
            final List<Object> eventQueue = new ArrayList<Object>();
            boolean isPosting;
            boolean isMainThread;
            Subscription subscription;
            Object event;
            boolean canceled;
    }
    
    public void post(Object event) {
            PostingThreadState postingState = currentPostingThreadState.get();
            List<Object> eventQueue = postingState.eventQueue;
            //添加到队列
            eventQueue.add(event);
            //如果队列没有在分发事件就开始分发
            if (!postingState.isPosting) {
                postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
                    //循环执行分发
                    while (!eventQueue.isEmpty()) {
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
    }
    
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass(); //事件参数类型
            boolean subscriptionFound = false;
            if (eventInheritance) {  //default true
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    //只要有一个成功就返回成功
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            //对错误的处理,可以自己注册`NoSubscriberEvent`来捕获
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    Log.d(TAG, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
    }
    
    private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
            //eventTypesCache  K->事件参数的类型   V->K的所有父类的结合(包括本身)
            //发送一个子类型的事件,父类型的也要求收到该事件
            synchronized (eventTypesCache) {
                List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
                if (eventTypes == null) {
                    eventTypes = new ArrayList<>();
                    Class<?> clazz = eventClass;
                    while (clazz != null) {
                        eventTypes.add(clazz);
                        //递归添加所有的父类/接口
                        addInterfaces(eventTypes, clazz.getInterfaces());
                        clazz = clazz.getSuperclass();
                    }
                    eventTypesCache.put(eventClass, eventTypes);
                }
                return eventTypes;
            }
    }
    

    根据上面的代码查看,知道所有的事件发送都是通过函数postSingleEventForEventType()发送。下面看看具体的实现。

    /**
     * 
     * @param event  事件数据
     * @param postingState
     * @param eventClass  事件参数类型
     * @return
     */
    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
            CopyOnWriteArrayList<Subscription> subscriptions;
            synchronized (this) {
            //subscriptionsByEventType K->方法参数的类型   V->所有参数类型的K的订阅函数的集合
                subscriptions = subscriptionsByEventType.get(eventClass);
            }
            if (subscriptions != null && !subscriptions.isEmpty()) {
                for (Subscription subscription : subscriptions) {
                    postingState.event = event;
                    postingState.subscription = subscription;
                    boolean aborted = false;
                    try {
                        postToSubscription(subscription, event, postingState.isMainThread);
                        aborted = postingState.canceled;
                    } finally {
                        postingState.event = null;
                        postingState.subscription = null;
                        postingState.canceled = false;
                    }
                    if (aborted) {
                        break;
                    }
                }
                return true;
            }
            return false;
    }
    
    /**
     * @param subscription 订阅者
     * @param event        数据
     * @param isMainThread 当前线程是否为主线程
     */
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        //mainThreadPoster = new HandlerPoster(this/*eventBus*/, Looper.getMainLooper(), 10);
                        // HandlerPoster extends Handler
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
    }
    
    

    这里就是EventBus对四种线程模式的不同处理,这里只拿出其中一个来讲。对于MAIN,如果当前POST线程就是主线程,那么当然就是直接调对应的函数就OK,如果当前POST不是主线程,那么就要用Handler发送到主线程。下面看看实现。

    void enqueue(Subscription subscription, Object event) {
            //类似android源码中的Message的获取方式
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                //待发送的消息加入队列
                queue.enqueue(pendingPost);
                if (!handlerActive) {
                    handlerActive = true;
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
    }
    
    @Override
    public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost); //使用EventBus回调方法
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                handlerActive = rescheduled;
            }
    }
    

    EventBus中,调用注册的方法。

    void invokeSubscriber(PendingPost pendingPost) {
            Object event = pendingPost.event;
            Subscription subscription = pendingPost.subscription;
            PendingPost.releasePendingPost(pendingPost);
            if (subscription.active) {
                invokeSubscriber(subscription, event);
            }
    }
    
    void invokeSubscriber(Subscription subscription, Object event) {
            try {
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
    }
    

    可以看出,只是很简单用反射调用了要调了方法,到此对普通事件的分析就完了,下面看看粘性事件。

    //EventBus
    public void postSticky(Object event) {
            synchronized (stickyEvents) {
                stickyEvents.put(event.getClass(), event);
            }
            // Should be posted after it is putted, in case the subscriber wants to remove immediately
            post(event);
    }
    

    结合上面注册的时候的代码分析,我们知道postSticky()的事件会在postSticky()的时候发送一次,并在有新注册粘性事件的时候会再次匹配,最后就是看看事件的解绑。

    事件的解绑

    /**
     * Unregisters the given subscriber from all event classes.
     */
     public synchronized void unregister(Object subscriber) {
            //注册类的注册函数的参数的集合
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                for (Class<?> eventType : subscribedTypes) {
                    unsubscribeByEventType(subscriber, eventType);
                }
                typesBySubscriber.remove(subscriber);
            } else {
                Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
    }
    
    /**
     * Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber.
     *
     * @param subscriber 注册类
     * @param eventType  参数Type
     */
    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
            //取得参数类型为eventType的所有注册函数的集合
            List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions != null) {
                int size = subscriptions.size();
                for (int i = 0; i < size; i++) {
                    Subscription subscription = subscriptions.get(i);
                    if (subscription.subscriber == subscriber) {
                        subscription.active = false;
                        subscriptions.remove(i);
                        i--;
                        size--;
                    }
                }
            }
    }
    

    通过上面的代码,可以看出,其实事件的移除就是把它重List/HashMapremove掉。

    参考

    相关文章

      网友评论

        本文标题:EventBus源码分析

        本文链接:https://www.haomeiwen.com/subject/zhmgsttx.html