美文网首页
EventBus分析

EventBus分析

作者: 顾氏名清明 | 来源:发表于2018-03-22 15:41 被阅读0次

    EventBus基本使用

    EventBus基于观察者模式的Android事件分发总线。

    从这个图可以看出,EventBus使得 事件发送 和 事件接收 很好的解耦。另外使得Android的组件例如Activity和Fragment之间的通信变得简单。最重要的一点是可以使得代码变得整洁。

    1. 首先添加依赖:
    compile 'org.greenrobot:eventbus:3.0.0'
    
    1. 使用分为三步:
      1) 定义消息事件MessageEvent,也就是创建事件类型

          public class MessageEvent {
              public final String message;
              public MessageEvent(String message) {
                  this.message = message;
              }
          }
      

      2) 选择你要订阅的订阅者(subscriber),Activity即在onCreate()加入 EventBus.getDefault().register(this),在不需要接收事件发生时可以 EventBus.getDefault().unregister(this)
      在订阅者里需要用注解关键字 @Subscribe来告诉EventBus使用什么方法处理event

          @Subscribe
          public void onMessageEvent(MessageEvent event) {
              Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();
          }
      

      注意方法只能被public修饰,在EventBus3.0之后该方法名字就可以自由的取了,之前要求只能是onEvent().

      3)发送事件

      EventBus.getDefault().post(new MessageEvent("HelloEveryone"));
      

      这样选择的Activity就会接收到该事件,并且触发onMessageEvent方法。

    EventBus源码解析

    1. 成员变量

      • private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

      扫描注册的对象中的所有方法,并将对象与匹配的方法存储在Subscription中,将监听的类与其CopyOnWriteArrayList<Subscription>存储在以下Map中。

      • Subscription:当前注册对象与其中的观察者函数,数据结构:

           final Object subscriber;
           final SubscriberMethod subscriberMethod
        
      • SubscriberMethod
        将扫描到的含有注释@Subscribe的函数封装为SubscriberMethod,数据结构如下:

        final Method method;
        final ThreadMode threadMode;
        final Class<?> eventType; (监听的事件类型)
        final int priority;
        final boolean sticky;
        
      • private final Map < Object, List < Class<?>>> typesBySubscriber;
        存储每个注册过的对象中监听了的所有事件类型

    2. register(Object subscriber)
      整个register流程如下:


      register流程
      • EventBus.getDefault().register(this); EventBus.getDefault()是一个单例,实现如下:

           public static EventBus getDefault() {  
               if (defaultInstance == null) {  
                   synchronized (EventBus.class) {  
                       if (defaultInstance == null) {  
                           defaultInstance = new EventBus();  
                       }  
                   }  
               }  
               return defaultInstance;  
           } 
        
      • 调用顺序
        register(Object subscriber) -> subscriberMethodFinder.findSubscriberMethods(subscriberClass) -> subscribe(subscriber, subscriberMethod);

      • findSubscriberMethods
        扫描函数当前对象中符合条件的函数,将扫描到的函数传入FindState中,经过校验后存储于FindState的subscriberMethods中。
        扫描规则:

        • 函数非静态,抽象函数;
        • 函数为public;
        • 函数仅单个参数;
        • 函数拥有@Subscribe的注解;
      • FindState

        为扫描到的函数做校验,在校验后,释放自己持有的资源。第一层校验在checkAdd函数中,如果当前尚未有函数监听过当前事件,就直接跳过第二层检查。

        第二层检查为完整的函数签名的检查,将函数名与监听事件类名拼接作为函数签名,如果当前subscriberClassByMethodKey中不存在相同methodKey时,返回true,检查结束;若存在相同methodKey时,说明子类重写了父类的监听函数,此时应当保留子类的监听函数而忽略父类。由于扫描是由子类向父类的顺序,故此时应当保留methodClassOld而忽略methodClass。

        boolean checkAdd(Method method, Class<?> eventType) {
            // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
            // Usually a subscriber doesn't have methods listening to the same event type.
            Object existing = anyMethodByEventType.put(eventType, method);
            if (existing == null) {
                return true;
            } else {
                if (existing instanceof Method) {
                    if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                        // Paranoia check
                        throw new IllegalStateException();
                    }
                    // Put any non-Method object to "consume" the existing Method
                    anyMethodByEventType.put(eventType, this);
                }
                return checkAddWithMethodSignature(method, eventType);
            }
        }
        
        private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
            methodKeyBuilder.setLength(0);
            methodKeyBuilder.append(method.getName());
            methodKeyBuilder.append('>').append(eventType.getName());
        
            String methodKey = methodKeyBuilder.toString();
            Class<?> methodClass = method.getDeclaringClass();
            Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
            if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                // Only add if not already found in a sub class
                return true;
            } else {
                // Revert the put, old class is further down the class hierarchy
                subscriberClassByMethodKey.put(methodKey, methodClassOld);
                return false;
            }
        }
        
      • subscribe(subscriber, subscriberMethod)

        private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType;
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
        
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
        
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            subscribedEvents.add(eventType);
        
        }
        

        在进行非空校验以及确定newSubscription尚未被添加至subscriptionsByEventType后,根据优先级将newSubscription插入subscriptionsByEventType的对应eventType的list中。
        typesBySubscriber维护一个键为注册的对象,值为该对象中所监听的事件类型的List,根据subscriber拿到list,并将新扫描得到的eventType加入。

    3. post(Object event)
      post流程:


      post流程
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }  
         }
    }
    
    • currentPostingThreadState是一个ThreadLocal类型的,里面存储了PostingThreadState;PostingThreadState包含了一个eventQueue和一些标志位。
      private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {  
             @Override  
             protected PostingThreadState initialValue() {  
                 return new PostingThreadState();  
             }  
         }  
      
      • 把传入的event,保存到了当前线程中的一个变量PostingThreadState的eventQueue中。判断当前是否是UI线程,遍历队列中的所有的event,调用postSingleEvent(ev entQueue.remove(0), postingState)方法。通过对于isPosting的判断,防止每次post都会去调用整个队列时,造成方法多次调用。

      • postSingleEvent

        private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            if (eventInheritance) {
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
        

        通过lookupAllEventTypes(eventClass)得到当前eventClass的Class,以及父类和接口的Class类型,而后逐个调用postSingleEventForEventType方法。

      • postSingleEventForEventType

            private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
                CopyOnWriteArrayList<Subscription> subscriptions;
                synchronized (this) {
                    subscriptions = subscriptionsByEventType.get(eventClass);
                }
                if (subscriptions != null && !subscriptions.isEmpty()) {
                    for (Subscription subscription : subscriptions) {
                        postingState.event = event;
                        postingState.subscription = subscription;
                        boolean aborted = false;
                        try {
                            postToSubscription(subscription, event, postingState.isMainThread);
                            aborted = postingState.canceled;
                        } finally {
                            postingState.event = null;
                            postingState.subscription = null;
                            postingState.canceled = false;
                        }
                        if (aborted) {
                            break;
                        }
                    }
                    return true;
                }
                return false;
            }
        

        从subscriptionsByEventType中拿到当前eventClass的List<Subscription> ,遍历,通过postToSubscription判断执行线程,逐个调用。

      • postToSubscription

        private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
        

        根据threadMode去判断应该在哪个线程去执行该方法,而invokeSubscriber方法内通过反射调用函数。

        • MainThread:
          首先去判断当前如果是UI线程,则直接调用;否则, mainThreadPoster.enqueue(subscription, event);把当前的方法加入到队列,然后通过handler去发送一个消息,在handler的handleMessage中,去执行方法。
        • BackgroundThread:
          如果当前非UI线程,则直接调用;如果是UI线程,则将任务加入到后台的一个队列,最终由Eventbus中的一个线程池去逐个调用
          executorService = Executors.newCachedThreadPool()。
        • Async:
          将任务加入到后台的一个队列,最终由Eventbus中的一个线程池去调用;线程池与BackgroundThread用的是同一个,会动态控制并发。
    1. EventBus的线程池

      BackgroundThread和Async的主要区别:
      1. BackgroundThread会判断是否是主线程,是主线程会调用线程池来解决,不是主线程则直接在BackgroundThread当前线程中调用;而Async都会在线程池中调用。
      2. BackgroundThread的任务会在线程池中顺序执行,而Async在没有限制。

      • AsyncPoster

         public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            queue.enqueue(pendingPost);
            eventBus.getExecutorService().execute(this);
        }
        
        @Override
        public void run() {
            PendingPost pendingPost = queue.poll();
            if(pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            }
            eventBus.invokeSubscriber(pendingPost);
        }
        

        eventBus.getExecutorService()获得如下线程池:

        private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
        

        在enqueque()中入队并执行当前run(),从队列获取到pendingPost并发执行。
        PendingPostQueue为链表结构,入队出队均为同步方法,为保证并发执行时,单个PendingPost不会被执行多次。

        synchronized PendingPost poll() {
            PendingPost pendingPost = head;
            if (head != null) {
                head = head.next;
                if (head == null) {
                    tail = null;
                }
            }
            return pendingPost;
        }
        
        • BackgroundPoster
           PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
           synchronized (this) {
               queue.enqueue(pendingPost);
               if (!executorRunning) {
                   executorRunning = true;
                   eventBus.getExecutorService().execute(this);
               }
           }
        }
        
        
        @Override
        public void run() {
           try {
               try {
                   while (true) {
                       PendingPost pendingPost = queue.poll(1000);
                       if (pendingPost == null) {
                           synchronized (this) {
                               // Check again, this time in synchronized
                               pendingPost = queue.poll();
                               if (pendingPost == null) {
                                   executorRunning = false;
                                   return;
                               }
                           }
                       }
                       eventBus.invokeSubscriber(pendingPost);
                   }
               } catch (InterruptedException e) {
                   eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
               }
               } finally {
                   executorRunning = false;
               }
           }
        

    通过executorRunning和同步锁保证任何时刻仅有一个线程在 执行,仅当抛出异常或队列取空后,置executorRunning为false,才能新开线程,实现了 BackgroundThread的任务在线程池中顺序执行。

    1. 粘性事件
      • 设计初衷:事件的发出早于观察者的注册,EventBus将粘性事件存储起来,在观察者注册后,将其发出。

      • 数据结构:private final Map<Class<?>, Object> stickyEvents;
        保存每个Event类型的最近一次post出的event

      • postSticky

        public void postSticky(Object event) {
            synchronized (stickyEvents) {
                stickyEvents.put(event.getClass(), event);
            }
            // Should be posted after it is putted, in case the subscriber wants to remove immediately
            post(event);
        }
        

        将粘性事件保存在stickyEvents,而后post出,此时如果存在已经注册的观察者,则情况同普通事件情况相同;如尚无注册的观察者,在postSingleEvent函数中将时间转化为一个NoSubscriberEvent事件发出,可由EventBus消耗并处理。待观察者注册时,从stickyEvents中将事件取出,重新分发给注册的观察者。

      • register

        private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
            Class<?> eventType = subscriberMethod.eventType;
            Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            } else {
                if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                            + eventType);
                }
            }
        
            int size = subscriptions.size();
            for (int i = 0; i <= size; i++) {
                if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                    subscriptions.add(i, newSubscription);
                    break;
                }
            }
        
            List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
            if (subscribedEvents == null) {
                subscribedEvents = new ArrayList<>();
                typesBySubscriber.put(subscriber, subscribedEvents);
            }
            subscribedEvents.add(eventType);
        
            if (subscriberMethod.sticky) {
                if (eventInheritance) {
                    // Existing sticky events of all subclasses of eventType have to be considered.
                    // Note: Iterating over all events may be inefficient with lots of sticky events,
                    // thus data structure should be changed to allow a more efficient lookup
                    // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                    Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                    for (Map.Entry<Class<?>, Object> entry : entries) {
                        Class<?> candidateEventType = entry.getKey();
                        if (eventType.isAssignableFrom(candidateEventType)) {
                            Object stickyEvent = entry.getValue();
                            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                        }
                    }
                } else {
                    Object stickyEvent = stickyEvents.get(eventType);
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        }
        
        

        在 if (subscriberMethod.sticky)这段代码中,首先判断是否监听Event的子类,而后调用checkPostStickyEventToSubscription将黏性事件发出,在checkPostStickyEventToSubscription中,判空后按一半事件的post流程将事件传递给观察者。

        private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
            if (stickyEvent != null) {
                // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
                // --> Strange corner case, which we don't take care of here.
                postToSubscription(newSubscription, stickyEvent, isMainThread());
            }
        }
        

    EventBus优缺点

    1. 组件间通信与解耦

      它是一个基于观察者模式的事件发布/订阅框架,开发者可以通过极少的代码去实现多个模块之间的通信,而不需要以层层传递接口的形式去单独构建通信桥梁。从而降低因多重回调导致的模块间强耦合,同时避免产生大量内部类。

    2. 很好的应用于Activity之间,Fragment之间,后台线程之间的通信,避免使用intent或者handler所带来的复杂度

    3. 速度快,尤其是在做了优化之后

    4. 轻量

    5. 有诸多高级特性,例如多种类型的线程模式,subscriber的优先级,等等

    6. 缺点:可能会造成接口的膨胀。特别是当程序要求大量形式各异的通知,而没有做出良好的抽象时,代码中会包含大量的接口,接口数量的增长又会带来命名、注释等等一大堆问题。本质上说观察者要求从零开始实现事件的产生、分发与处理过程,这就要求参与者必须对整个通知过程有着良好的理解。当程序代码适量时,这是一个合理的要求,然而当程序太大时,这将成为一种负担。

    相关文章

      网友评论

          本文标题:EventBus分析

          本文链接:https://www.haomeiwen.com/subject/wkpaqftx.html