EventBus源码学习随笔

作者: 黄俊彬 | 来源:发表于2018-08-17 11:20 被阅读112次

    EventBus是什么?

    简介

    EventBus是Android和Java的开源库,它使用发布者/订阅者模式进行松散耦合。EventBus使用了总线控制,能够用几行代码解耦类及简化代码,消除依赖关系,加速应用程序开发。

    下图为官方示例图:

    官方示例图

    官网网址:EventBus官网

    Github地址:Github

    特点

    • 简化组件之间的通信
    • 发送者和接收者高度解耦
    • 性能高效,社区活跃
    • 库文件很小(<50KB)
    • 具有简便配置线程、优先级等高级特性

    EventBus怎么用?

    1、gradle引入

    implementation 'org.greenrobot:eventbus:3.1.1'
    

    2、注册与取消注册

    //注册
    EventBus.getDefault().register(this);
    
    //取消注册
    EventBus.getDefault().unregister(this);
    
    

    3、事件定义与发送

    //定义
    public class MessageEvent {
    }
    //发送
    EventBus.getDefault().post(new MessageEvent());
    

    4、事件接收

     @Subscribe(threadMode = ThreadMode.POSTING)
        public void onEventMainThread(MessageEvent event) {
            // doSomeThing();
        };
    

    EventBus核心执行流程是怎样?

    EventBus的使用包含注册、取消注册、事件定义发送及事件接收。当用户进行注册时,会通过反射获取实例中定义事件方法集合,然后将事件方法集合及订阅者加入到Map中,当执行post时,会根据事件类型,从集合中获取对应的订阅集合,通过配置的threadMode,使用对应的Poster调用订阅者的事件,最后通过反射method的invoke执行事件。

    关键类功能说明

    说明
    EventBus 总线调度类,getDefault()为单例模式。内部持有订阅者及事件集合。还有各事件的发送器。eventTypesCache(缓存所有粘性事件的集合)、subscriptionsByEventType(key为事件,value为订阅者集合的Map)、typesBySubscriber(key为订阅者,value为事件集合的Map)、currentPostingThreadState(ThreadLocal,当前线程的事件集合)、mainThreadPoster(主线程的post)、backgroundPoster(后台线程的post)、asyncPoster(异步线程的post)、subscriberMethodFinder(获取订阅者中的事件)
    SubscriberMethod 订阅方法的类,包含Method、ThreadMode、priority等配置属性
    Subscription 订阅者类,包含subscriber的Object实例、subscriberMethod
    PostingThreadState 存储当前线程的事件集合
    SubscriberMethodFinder 用于获取订阅中的定义的事件接收方法
    PendingPost subscription与event的包装类,内部维护一个pendingPostPool,当池中有PendingPost实例,会进行复用
    PendingPostQueue 内部维护了一个head、tail的PendingPost实例,提供enqueue及poll操作
    HandlerPoster 用于处理主线程的事件执行
    BackgroundPoster 用于处理后台线程的事件执行
    AsyncPoster 用于执行异步线程的事件执行

    代码执行流程

    注册-register

    注册主要是通过反射获取订阅者中定义的事件方法集合,将订阅者和事件集合加入对应的Map,然后会判断是否支持粘性事件,将之前发送的粘性事件缓存发送给订阅者。

    1、 源码实现

    public void register(Object subscriber) {
            //获取注册的Object的Class
            Class<?> subscriberClass = subscriber.getClass();
            //通过subscriberMethodFinder获取该Object中所有的订阅方法(SubscriberMethod集合)
            List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
            synchronized (this) {
                //遍历事件集合
                for (SubscriberMethod subscriberMethod : subscriberMethods) {
                    subscribe(subscriber, subscriberMethod);
                }
            }
        }
        
      private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType;
            //生成Subscription对象(SubscriberMethod的包装类)
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            //获取subscriptionsByEventType集合(key为事情,value为订阅集合)
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
    
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
            //获取typesBySubscriber集合(key为订阅者,value为事件集合)
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            //将所有的事件,根据订阅者key加入到事件集合中
            subscribedEvents.add(eventType);
    
            //事件方法是否支持(粘性事情)
            if (subscriberMethod.sticky) {
                //事件方法是否支持(继承事件)
                if (eventInheritance) {
                
                    //遍历stickyEvents,找出所有继承于事件的父事件
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            Object stickyEvent = entry.getValue();
                            //给订阅者发送之前缓存的粘性事件
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    Object stickyEvent = stickyEvents.get(eventType);
                            //给订阅者发送之前缓存的粘性事件
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
    

    2、 流程图

    image

    发送-post

    1、 源码实现

     public void post(Object event) {
            //获取当前线程的PostingThreadState
            PostingThreadState postingState = currentPostingThreadState.get();
            //获取事件队列postingState.eventQueue
            List<Object> eventQueue = postingState.eventQueue;
            //添加事件
            eventQueue.add(event);
            //是否正在发送
            if (!postingState.isPosting) {
                postingState.isMainThread = isMainThread();
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                //循环获取eventQueue中的事件
                try {
                    while (!eventQueue.isEmpty()) {
                        //获取集合数据并移除,然后发送事件
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
        
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            //事件是否支持继承
            if (eventInheritance) {
                //找出所有继承事件
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                //遍历集合
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    //发送
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
               //发送
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
               //如果没有订阅者会发送一个NoSubscriberEvent事件
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
        
     private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
            CopyOnWriteArrayList<Subscription> subscriptions;
            synchronized (this) {
                //根据事件获取所有订阅者subscriptions
                subscriptions = subscriptionsByEventType.get(eventClass);
            }
            if (subscriptions != null && !subscriptions.isEmpty()) {
                //遍历所有订阅者
                for (Subscription subscription : subscriptions) {
                    postingState.event = event;
                    postingState.subscription = subscription;
                    boolean aborted = false;
                    try {
                        //发送事件
                        postToSubscription(subscription, event, postingState.isMainThread);
                        aborted = postingState.canceled;
                    } finally {
                        postingState.event = null;
                        postingState.subscription = null;
                        postingState.canceled = false;
                    }
                    if (aborted) {
                        break;
                    }
                }
                return true;
            }
            return false;
        }
        
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            //根据对应的threadMode,使用对应的post进行事件的处理
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    
     void invokeSubscriber(Subscription subscription, Object event) {
            try {
                //最后通过反射执行事件方法
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
        }
    

    2、 流程图


    image

    取消注册-unregister

    1、 源码实现

        public synchronized void unregister(Object subscriber) {
        //根据subscriber从typesBySubscriber获取事件集合
            List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
            if (subscribedTypes != null) {
                //遍历订阅者的事件
                for (Class<?> eventType : subscribedTypes) {
                    unsubscribeByEventType(subscriber, eventType);
                }
                //typesBySubscriber移除subscriber
                typesBySubscriber.remove(subscriber);
            } else {
                logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
            }
        }
        
            private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
            //根据eventType从subscriptionsByEventType获取订阅集合
            List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions != null) {
                int size = subscriptions.size();
                for (int i = 0; i < size; i++) {
                    Subscription subscription = subscriptions.get(i);
                    if (subscription.subscriber == subscriber) {
                        //遍历集合移除当前的subscriber
                        subscription.active = false;
                        subscriptions.remove(i);
                        i--;
                        size--;
                    }
                }
            }
        }
    

    2、 流程图

    image

    EventBus如何识别类中定义的接收方法?

    EventBus之所以如此流行,一个很重要的地方就是使用非常简便。我们只需要在类中定义好方法接收对应的事件、配置好相关的标注就可以。那么怎么获取到订阅者的事件方法集合,就是EventBus设计的一个精髓的地方。通过上面的注册方法,我们知道主要是通过下面的方法来获取,下面我们主要分析一下具体的实现。

    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    

    接着看findSubscriberMethods的事件

     List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
            //定义了缓存Map,避免每次都执行反射获取,提高性能
            List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
            if (subscriberMethods != null) {
                return subscriberMethods;
            }
            if (ignoreGeneratedIndex) {
                //通过反射获取
                subscriberMethods = findUsingReflection(subscriberClass);
            } else {
                //通过索引获取
                subscriberMethods = findUsingInfo(subscriberClass);
            }
            if (subscriberMethods.isEmpty()) {
                throw new EventBusException("Subscriber " + subscriberClass
                        + " and its super classes have no public methods with the @Subscribe annotation");
            } else {
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
                return subscriberMethods;
            }
        }
    

    最后会执行indUsingReflection去获取,具体实现如下:

     private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
             //FindState 用来做订阅方法的校验和保存
            FindState findState = prepareFindState();
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
                //通过反射来获得订阅方法信息
                findUsingReflectionInSingleClass(findState);
                //查找父类的订阅方法
                findState.moveToSuperclass();
            }
            return getMethodsAndRelease(findState);
        }
    

    关键的实现在findUsingReflectionInSingleClass方法中,实现如下:

    private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
                // //通过反射得到方法数组
                methods = findState.clazz.getDeclaredMethods();
            } catch (Throwable th) {
                // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
                methods = findState.clazz.getMethods();
                findState.skipSuperClasses = true;
            }
            //遍历Method
            for (Method method : methods) {
                int modifiers = method.getModifiers();
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    ////保证必须只有一个事件参数
                    if (parameterTypes.length == 1) {
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        //得到注解
                        if (subscribeAnnotation != null) {
                            Class<?> eventType = parameterTypes[0];
                            //校验是否添加该方法
                            if (findState.checkAdd(method, eventType)) {
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                //实例化SubscriberMethod对象并添加
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
        }
    
    

    总结一下,EventBus是通过反射getDeclaredMethods()获取类的方法集合,然后遍历方法集合,将符合事件定义的方法(public、只有一个事件参数、有Subcribe的注解等)加入到集合中。从而达到识别订阅者中定义的事件方法。

    EventBus中的线程调度机制是怎么样的?

    我们知道,EventBus可以通过定义threadMode来指定事件回调的执行线程。主要的配置如下:

    • ThreadMode: POSTING:默认的,在同一个线程中执行。

    • ThreadMode: MAIN :主线程执行

    • ThreadMode: MAIN_ORDERED: 主线程执行,不过需要排队,如果前一个也是main_ordered 需要等前一个执行完成后才执行,在主线程中执行,可以处理更新ui的操作。

    • ThreadMode: BACKGROUND :后台进程,处理如保存到数据库等操作。

    • ThreadMode: ASYNC :异步执行,另起线程操作。

    通过上面的流程分析在post的过程中,最后都是通过postToSubscription执行,在这里面判断了threadMode的类型,具体的实现如下:

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            //根据对应的threadMode,使用对应的post进行事件的处理
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    1、POSTING模式,直接执行invokeSubscriber。

    2、MAIN模式,如果判断当前线程是主线程则执行invokeSubscriber,否则会使用mainThreadPoster执行enqueue方法。mainThreadPoster为HandlerPoster的实例,继承了Handler,是使用了MainLooper进行创建,也就是其的handleMessage在主线程中执行。
    我们看具体的实现:

     public void enqueue(Subscription subscription, Object event) {
            //构建PendingPost对象
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
              //加入队列
                queue.enqueue(pendingPost);
                //如果没有激活hander
                if (!handlerActive) {
                    handlerActive = true;
                    //发送消息
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
        
          @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                   //循环获取队列中的所有pendingPost
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                              //如果已没有数据,更新 handlerActive
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                   //在主线程中实现事件的方法
                    eventBus.invokeSubscriber(pendingPost);
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                handlerActive = rescheduled;
            }
        }
    

    HandlerPoster会通过主线程的Handler去执行队列中的所有事件方法。

    3、MAIN_ORDERED模式 优先主线程队列执行

     if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
    

    4、BACKGROUND模式 如果再主线程则执行后台线程执行,否则使用当前线程

    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    
    

    这里我们主要看backgroundPoster的实现,BackgroundPoster继承了Runnable,实现线程池执行run方法,通过executorRunning控制不会每次都启动一个新任务。

    public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                //加入队列
                queue.enqueue(pendingPost);
                //变量标识,不要每次都执行
                if (!executorRunning) {
                    executorRunning = true;
                    //线程池执行
                    eventBus.getExecutorService().execute(this);
                }
            }
        }
    
      @Override
        public void run() {
            try {
                try {
                    while (true) {
                        //循环间隔1s获取事件
                        PendingPost pendingPost = queue.poll(1000);
                        if (pendingPost == null) {
                            synchronized (this) {
                                // Check again, this time in synchronized
                                pendingPost = queue.poll();
                                if (pendingPost == null) {
                                    //如果已没有任务,更新变量
                                    executorRunning = false;
                                    return;
                                }
                            }
                        }
                        //在异步线程执行了事件方法
                        eventBus.invokeSubscriber(pendingPost);
                    }
                } catch (InterruptedException e) {
                    eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
                }
            } finally {
                executorRunning = false;
            }
        }
    

    backgroundPoster会开启一个线程去执行当前所有队列中的事件方法。

    5、ASYNC 模式 主要使用了AsyncPoster,也继承了run接口。实现如下:

     public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            queue.enqueue(pendingPost);
            eventBus.getExecutorService().execute(this);
        }
    
        @Override
        public void run() {
            PendingPost pendingPost = queue.poll();
            if(pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            }
            eventBus.invokeSubscriber(pendingPost);
        }
    

    AsyncPoster每一个事件都会开启一个异步任务,通过线程池去执行。

    总结一下
    EventBus通过配置threadMode,控制事件在不同的线程中去执行。总归有5种模式,分别为POSTING、MAIN、MAIN_ORDERED、BACKGROUND、ASYNC。主要是通过HandlerPoster、backgroundPoster、AsyncPoster来实现线程的切换。

    • HandlerPoster会通过主线程的Handler开启循环去执行队列中的所有事件方法。
    • backgroundPoster会开启一个线程循环执行当前所有队列中的事件方法。
    • AsyncPoster每一个事件都会开启一个异步任务,通过线程池去执行。

    EventBus的事件发送和接收的原理是什么?

    EventBus主要是通过观察者模式来实现事件的发送和接收。使用register后,会将订阅者及定义的事件接收方法加入到Map中,当在任意地方执行post时,会对事件类型进行匹配,找出所有的订阅者,根据配置的threadMode,使用不同的poster通过反射去执行事件方法。当使用unregister后,会将订阅者在Map中移除,进行取消注册。

    EventBus中代码运用了那些设计模式,有什么巧妙的设计?

    1、单例模式

    EventBus.getDefault()使用了懒汉的单例模式。

    2、外观模式

    EventBus对外提供了统一的调度,屏蔽了内部的实现。

    3、建造者模式

    Event对象的创建使用EventBusBuilder进行创建,将复杂对象的创建和表示分离,调用者不需要知道复杂的创建过程,使用Build的相关方法进行配置创建对象。

    4、策略模式

    根据threadMode的设置,使用不同Poster的实现策略来执行事件方法

    总结

    1、框架的设计不在复杂而在精巧

    2、使用反射和标注可以简化很多实现

    3、EventBus的使用要注意避免大量的滥用,将导致逻辑的分散,出现问题后很难定位

    相关文章

      网友评论

        本文标题:EventBus源码学习随笔

        本文链接:https://www.haomeiwen.com/subject/kvezbftx.html