美文网首页
EventBus源码分析

EventBus源码分析

作者: Horps | 来源:发表于2021-09-08 22:04 被阅读0次
  • 关于EventBus

    EventBus作为Android端的事件总线,为移动端页面间的事件传递提供了很大的便利。在使用它之前我们先考虑一下如果没有EventBus我们怎么去处理页面间的事件交互。

    假设这样一个场景,有页面A和B,我在B页面执行了一个操作,操作成功后需要刷新A页面的数据,很显然我需要通知页面A去做这件事情,我们可以有以下几种方式去实现:

    • 创建一个静态方法,直接调用A.xxx去实现,但是如果这样的话,A.xxx里面调用的所有方法都得是静态的,这意味着这个A类在程序启动的时候就要去在内存中装载其静态方法,这很显然很不恰当。
    • 找到A的实例,通过它调用实例方法完成,那么问题就成了怎么找到A的实例:
      1. 创建全局Activity或Fragment的List,通过index取得对应实例,在页面很多的时候你很难记住每个类实例的下标,你会说可以用Map,那非单例模式启动的Activity就无法存储多个实例。
      2. 如果是startActivityForResult的方式启动的B,那你可以使用setResult回调来处理,但是这种场景又被限定死了,很多时候事件交互的页面都不是相邻的,Fragment事件响应也不适用。

    其实上面的思路也是EventBus的实现思路,简单来说,EventBus会把所有注册了处理事件方法的类的实例都存在一个地方,然后事件发送的时候找到这个实例,然后通过反射来实现方法的回调。

  • EventBus使用

    • 关于构造

      EventBus使用建造器模式创建实例,通常我们需要一个单例来统一管理App的事件调度,所以默认的实例化EventBus已经帮我们实现好了,我们直接调用:

      public static EventBus getDefault() {
          EventBus instance = defaultInstance;
          if (instance == null) {
              Class var1 = EventBus.class;
              synchronized(EventBus.class) {
                  instance = defaultInstance;
                  if (instance == null) {
                      instance = defaultInstance = new EventBus();
                  }
              }
          }
      
          return instance;
      }
      

      如果我们想要保留默认构造实例的大部分设置,修改部分属性的话可以这么做:

      EventBus.builder().throwSubscriberException(BuildConfig.DEBUG).installDefaultEventBus();
      

      需要注意的是,这个操作必须在使用EventBus默认实例之前(最好在Application里面):

      public EventBus installDefaultEventBus() {
          Class var1 = EventBus.class;
          synchronized(EventBus.class) {
              if (EventBus.defaultInstance != null) {
                  throw new EventBusException("Default instance already exists. It may be only set once before it's used the first time to ensure consistent behavior.");
              } else {
                  EventBus.defaultInstance = this.build();
                  return EventBus.defaultInstance;
              }
          }
      }
      

      当然你也可以使用EventBus.builder().xxx()....build()来完全自定义EventBus实例。

    • 关于使用

      1. 定义事件,EventBus发送的是一个Object类型,所以你可以封装自己的任意类型的对象作为事件类型。

      2. 定义订阅者,譬如:

        @Subscribe(threadMode = ThreadMode.MAIN)
        public void onMessageEvent(MessageEvent event) {
            Toast.makeText(getActivity(), event.message, Toast.LENGTH_SHORT).show();
        }
        
      3. 注册和取消订阅:

        @Override
        public void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            EventBus.getDefault().register(this);
        }
         
        @Override
        public void onDestroy() {
            EventBus.getDefault().unregister(this);
            super.onStop();
        }
        

        注册放在onCreate或onStart中都可以,但是取消注册只能放在onDestroy中,因为Android中的事件交互都是跨页面的,如果在onStop中取消注册则无法找到事件订阅者。这一点官网写错了。

      4. 发送事件:

        EventBus.getDefault().post(new MessageEvent("Hello everyone!"));
        
  • 源码

    • 发送事件

      EventBus的post有两种方式,一个是post、一个是postSticky:

      public void postSticky(Object event) {
          synchronized(this.stickyEvents) {
              this.stickyEvents.put(event.getClass(), event);
          }
      
          this.post(event);
      }
      
      public void post(Object event) {
              EventBus.PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get();
              List<Object> eventQueue = postingState.eventQueue;
              eventQueue.add(event);
              if (!postingState.isPosting) {
                  //保存当前事件发送时所在的线程是不是主线程
                  postingState.isMainThread = this.isMainThread();
                  postingState.isPosting = true;
                  if (postingState.canceled) {
                      throw new EventBusException("Internal error. Abort state was not reset");
                  }
      
                  try {
                      while(!eventQueue.isEmpty()) {
                          this.postSingleEvent(eventQueue.remove(0), postingState);
                      }
                  } finally {
                      postingState.isPosting = false;
                      postingState.isMainThread = false;
                  }
              }
      
          }
      

      可见,postSticky方法最终还是调用了post方法来发送事件,在这之前会把事件的Class类型和事件对象作为一个key-value存到stickyEvents中,这就是为了处理黏性事件,Sticky(黏性)事件是用于订阅者还没有注册的时候事件已经发送的情况下的处理,Android中就是post给还未启动的Activity实例的时候,后面会看到区别。

      至于为什么要用队列保存当前事件,我想是因为因为不同线程可能会同时post,这是多线程需要。

      currentPostingThreadState是一个ThreadLocal,使用它内部的Map来保存所有的要发送的事件,而且使用它获取当前线程的API来判断是不是主线程,这很关键。

    • 保存判断是否是Android主线程

      private boolean isMainThread() {
          return this.mainThreadSupport == null || this.mainThreadSupport.isMainThread();
      }
      
      this.mainThreadSupport = builder.getMainThreadSupport();
      
      MainThreadSupport getMainThreadSupport() {
              if (this.mainThreadSupport != null) {
                  return this.mainThreadSupport;
              } else if (AndroidLogger.isAndroidLogAvailable()) {
                  Object looperOrNull = getAndroidMainLooperOrNull();
                  return looperOrNull == null ? null : new AndroidHandlerMainThreadSupport((Looper)looperOrNull);
              } else {
                  return null;
              }
          }
      

      这个方法最终返回了一个AndroidHandlerMainThreadSupport对象:

      public interface MainThreadSupport {
          boolean isMainThread();
      
          Poster createPoster(EventBus var1);
      
          public static class AndroidHandlerMainThreadSupport implements MainThreadSupport {
              private final Looper looper;
      
              public AndroidHandlerMainThreadSupport(Looper looper) {
                  this.looper = looper;
              }
      
              public boolean isMainThread() {
                  return this.looper == Looper.myLooper();
              }
      
              public Poster createPoster(EventBus eventBus) {
                  return new HandlerPoster(eventBus, this.looper, 10);
              }
          }
      }
      

      可见,AndroidHandlerMainThreadSupport对象里面保存了Android主线程的looper,postingState.isMainThread保存了发送该事件的线程是不是Android主线程。

    • 找到所有的该事件类型(可能还有父类型)

      private void postSingleEvent(Object event, EventBus.PostingThreadState postingState) throws Error {
          Class<?> eventClass = event.getClass();
          boolean subscriptionFound = false;
          //eventInheritance为true则会查找此事件类型的所有继承的父类型和实现的接口类型,也就是说这个属性设置为true的话(默认是true)则会发送给接受类型是这些类型的所有订阅者
          if (this.eventInheritance) {
              List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
              int countTypes = eventTypes.size();
      
              for(int h = 0; h < countTypes; ++h) {
                  Class<?> clazz = (Class)eventTypes.get(h);
                  subscriptionFound |= this.postSingleEventForEventType(event, postingState, clazz);
              }
          } else {
              subscriptionFound = this.postSingleEventForEventType(event, postingState, eventClass);
          }
              //找不到任何注册的订阅者的情况
          if (!subscriptionFound) {
              if (this.logNoSubscriberMessages) {
                  this.logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
              }
      
              if (this.sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
                  this.post(new NoSubscriberEvent(this, event));
              }
          }
      
      }
      

      这个方法就是根据事件类型找到所有已注册的订阅者。

    • 根据事件类型找到订阅者

      private boolean postSingleEventForEventType(Object event, EventBus.PostingThreadState postingState, Class<?> eventClass) {
          CopyOnWriteArrayList subscriptions;
          synchronized(this) {
              subscriptions = (CopyOnWriteArrayList)this.subscriptionsByEventType.get(eventClass);
          }
      
          if (subscriptions != null && !subscriptions.isEmpty()) {
              Iterator var5 = subscriptions.iterator();
      
              while(var5.hasNext()) {
                  Subscription subscription = (Subscription)var5.next();
                  postingState.event = event;
                  postingState.subscription = subscription;
      
                  boolean aborted;
                  try {
                      this.postToSubscription(subscription, event, postingState.isMainThread);
                      aborted = postingState.canceled;
                  } finally {
                      postingState.event = null;
                      postingState.subscription = null;
                      postingState.canceled = false;
                  }
      
                  if (aborted) {
                      break;
                  }
              }
      
              return true;
          } else {
              return false;
          }
      }
      

      subscriptionsByEventType根据事件类型找到订阅者集合subscriptions,然后遍历它找到每个Subscription并把事件逐一发送给它们。

    • 执行订阅者的订阅方法

      private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
          switch(subscription.subscriberMethod.threadMode) {
          case POSTING:
              this.invokeSubscriber(subscription, event);
              break;
          case MAIN:
              if (isMainThread) {
                  this.invokeSubscriber(subscription, event);
              } else {
                  this.mainThreadPoster.enqueue(subscription, event);
              }
              break;
          case MAIN_ORDERED:
              if (this.mainThreadPoster != null) {
                  this.mainThreadPoster.enqueue(subscription, event);
              } else {
                  this.invokeSubscriber(subscription, event);
              }
              break;
          case BACKGROUND:
              if (isMainThread) {
                  this.backgroundPoster.enqueue(subscription, event);
              } else {
                  this.invokeSubscriber(subscription, event);
              }
              break;
          case ASYNC:
              this.asyncPoster.enqueue(subscription, event);
              break;
          default:
              throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
          }
      
      }
      

      这里会根据订阅方法注解的threadMode属性执行不同操作。

      • POSTING:

        表示直接执行方法,所以如果订阅方法不在Android主线程执行的话会出错。

      • MAIN:

        如果订阅方法表明了要在主线程操作,则要判断postingState.isMainThread,即如果post所在的线程不是主线程的话,方法调用会放到mainThreadPoster队列中:

        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized(this) {
                this.queue.enqueue(pendingPost);
                if (!this.handlerActive) {
                    this.handlerActive = true;
                    if (!this.sendMessage(this.obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }
        

        mainThreadPoster是通过前面的AndroidHandlerMainThreadSupport的createPoster方法创建的,他是一个Handler,和主线程的looper绑定在一起:

        public Poster createPoster(EventBus eventBus) {
            return new HandlerPoster(eventBus, this.looper, 10);
        }
        

        所以此时事件都存放在了主线程的队列里面,等到系统切换回主线程后,会自动遍历队列处理:

        public void handleMessage(Message msg) {
            boolean rescheduled = false;
        
            try {
                long started = SystemClock.uptimeMillis();
        
                long timeInMethod;
                do {
                    PendingPost pendingPost = this.queue.poll();
                    if (pendingPost == null) {
                        synchronized(this) {
                            pendingPost = this.queue.poll();
                            if (pendingPost == null) {
                                this.handlerActive = false;
                                return;
                            }
                        }
                    }
                              //调用订阅者的注册方法
                    this.eventBus.invokeSubscriber(pendingPost);
                    timeInMethod = SystemClock.uptimeMillis() - started;
                } while(timeInMethod < (long)this.maxMillisInsideHandleMessage);
        
                if (!this.sendMessage(this.obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
        
                rescheduled = true;
            } finally {
                this.handlerActive = rescheduled;
            }
        
        }
        

        SystemClock.uptimeMillis()取的是系统自boot以来的时间,所以只要queue里面事件还未处理完且程序未被kill则会一直处理订阅者的注册方法。

      • MAIN_ORDERED:

        官方说是主线程顺序执行,但从代码来看,MAIN_ORDERED和MAIN的非主线程post是一样的都会放到队列中,而且这里还没有判断是否是主线程,所以mainThreadPoster为null的时候还会出错。

      • BACKGROUND:

        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized(this) {
                this.queue.enqueue(pendingPost);
                if (!this.executorRunning) {
                    this.executorRunning = true;
                      //开启新线程
                    this.eventBus.getExecutorService().execute(this);
                }
        
            }
        }
        
        public void run() {
            try {
                while(true) {
                    PendingPost pendingPost = this.queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized(this) {
                            pendingPost = this.queue.poll();
                            if (pendingPost == null) {
                                this.executorRunning = false;
                                return;
                            }
                        }
                    }
        
                    this.eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException var9) {
                this.eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", var9);
            } finally {
                this.executorRunning = false;
            }
        
        }
        

        和主线程注册的相反,如果是主线程则放在队列中等待非主线程时候执行。

      • ASYNC:

        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            this.queue.enqueue(pendingPost);
            this.eventBus.getExecutorService().execute(this);
        }
        
        public void run() {
            PendingPost pendingPost = this.queue.poll();
            if (pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            } else {
                this.eventBus.invokeSubscriber(pendingPost);
            }
        }
        

        队列中只存放一个事件,并且总是开启一个新的线程处理它。

    • 注册订阅

      前面我们通过eventType找到所有的订阅者然后逐一发送事件,那么这些订阅者是怎么保存在subscriptionsByEventType中的呢?

      前面说到在onCreate或者onStart中注册订阅者,来看一下过程。

      public void register(Object subscriber) {
          Class<?> subscriberClass = subscriber.getClass();
          List<SubscriberMethod> subscriberMethods = this.subscriberMethodFinder.findSubscriberMethods(subscriberClass);
          synchronized(this) {
              Iterator var5 = subscriberMethods.iterator();
      
              while(var5.hasNext()) {
                  SubscriberMethod subscriberMethod = (SubscriberMethod)var5.next();
                  this.subscribe(subscriber, subscriberMethod);
              }
      
          }
      }
      

      findSubscriberMethods:

      List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
          List<SubscriberMethod> subscriberMethods = (List)METHOD_CACHE.get(subscriberClass);
          if (subscriberMethods != null) {
              return subscriberMethods;
          } else {
              if (this.ignoreGeneratedIndex) {
                  subscriberMethods = this.findUsingReflection(subscriberClass);
              } else {
                  subscriberMethods = this.findUsingInfo(subscriberClass);
              }
      
              if (subscriberMethods.isEmpty()) {
                  throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation");
              } else {
                  METHOD_CACHE.put(subscriberClass, subscriberMethods);
                  return subscriberMethods;
              }
          }
      }
      
      private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
          SubscriberMethodFinder.FindState findState = this.prepareFindState();
          findState.initForSubscriber(subscriberClass);
      
          while(findState.clazz != null) {
              this.findUsingReflectionInSingleClass(findState);
              findState.moveToSuperclass();
          }
      
          return this.getMethodsAndRelease(findState);
      }
      
      private void findUsingReflectionInSingleClass(SubscriberMethodFinder.FindState findState) {
          Method[] methods;
          try {
              methods = findState.clazz.getDeclaredMethods();
          } catch (Throwable var13) {
              try {
                  methods = findState.clazz.getMethods();
              } catch (LinkageError var12) {
                  String msg = "Could not inspect methods of " + findState.clazz.getName();
                  if (this.ignoreGeneratedIndex) {
                      msg = msg + ". Please consider using EventBus annotation processor to avoid reflection.";
                  } else {
                      msg = msg + ". Please make this class visible to EventBus annotation processor to avoid reflection.";
                  }
      
                  throw new EventBusException(msg, var12);
              }
      
              findState.skipSuperClasses = true;
          }
      
          Method[] var3 = methods;
          int var4 = methods.length;
      
          for(int var14 = 0; var14 < var4; ++var14) {
              Method method = var3[var14];
              int modifiers = method.getModifiers();
              //判断是否是public且不是static方法
              if ((modifiers & 1) != 0 && (modifiers & 5192) == 0) {
                  Class<?>[] parameterTypes = method.getParameterTypes();
                  if (parameterTypes.length == 1) {
                      Subscribe subscribeAnnotation = (Subscribe)method.getAnnotation(Subscribe.class);
                      if (subscribeAnnotation != null) {
                          //这里只会取第一个参数,所以定义多余参数并无意义,当然只要strictMethodVerification设为false(默认)的话定义多个参数并不会出错
                          Class<?> eventType = parameterTypes[0];
                          if (findState.checkAdd(method, eventType)) {
                              ThreadMode threadMode = subscribeAnnotation.threadMode();
                              findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                          }
                      }
                  } else if (this.strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                      String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                      throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length);
                  }
              //如果strictMethodVerification设为true的话还会强制要求方法参数个数为1
              } else if (this.strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                  String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                  throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
              }
          }
      
      }
      

      主要就是通过反射找到订阅者的订阅方法并保存起来。

      回到register方法,每个SubscriberMethod都调用subscribe方法:

      private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
          Class<?> eventType = subscriberMethod.eventType;
          Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
          CopyOnWriteArrayList<Subscription> subscriptions = (CopyOnWriteArrayList)this.subscriptionsByEventType.get(eventType);
          if (subscriptions == null) {
              subscriptions = new CopyOnWriteArrayList();
              this.subscriptionsByEventType.put(eventType, subscriptions);
          } else if (subscriptions.contains(newSubscription)) {
              throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
          }
      
          int size = subscriptions.size();
      
          for(int i = 0; i <= size; ++i) {
              //优先级关键
              if (i == size || subscriberMethod.priority > ((Subscription)subscriptions.get(i)).subscriberMethod.priority) {
                  subscriptions.add(i, newSubscription);
                  break;
              }
          }
      
          List<Class<?>> subscribedEvents = (List)this.typesBySubscriber.get(subscriber);
          if (subscribedEvents == null) {
              subscribedEvents = new ArrayList();
              this.typesBySubscriber.put(subscriber, subscribedEvents);
          }
      
          ((List)subscribedEvents).add(eventType);
        
          //处理sticky事件
          if (subscriberMethod.sticky) {
              if (this.eventInheritance) {
                  Set<Entry<Class<?>, Object>> entries = this.stickyEvents.entrySet();
                  Iterator var9 = entries.iterator();
      
                  while(var9.hasNext()) {
                      Entry<Class<?>, Object> entry = (Entry)var9.next();
                      Class<?> candidateEventType = (Class)entry.getKey();
                      if (eventType.isAssignableFrom(candidateEventType)) {
                          Object stickyEvent = entry.getValue();
                          this.checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                      }
                  }
              } else {
                  Object stickyEvent = this.stickyEvents.get(eventType);
                  this.checkPostStickyEventToSubscription(newSubscription, stickyEvent);
              }
          }
      
      }
      

      到这里和前面的subscriptionsByEventType对应起来了,注意这里添加Subscription的时候判断priority,值大的放在前面,调用的时候从头循环,这就实现了priority优先级的效果。

      还发现,注册的时候会循环stickyEvents把之前通过postSticky方法发送的事件给处理掉,这也就是前面未注册的sticky事件得到处理的原理。

    • 取消事件传递

      通过前面的优先级传递发现,priority值大的会先执行,那么我们可以在优先级高的回调中通过EventBus.getDefault().cancelEventDelivery(event)取消传递:

      public void cancelEventDelivery(Object event) {
          EventBus.PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get();
          if (!postingState.isPosting) {
              throw new EventBusException("This method may only be called from inside event handling methods on the posting thread");
          } else if (event == null) {
              throw new EventBusException("Event may not be null");
          } else if (postingState.event != event) {
              throw new EventBusException("Only the currently handled event may be aborted");
          } else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
              throw new EventBusException(" event handlers may only abort the incoming event");
          } else {
              postingState.canceled = true;
          }
      }
      

      可见,就是把postingState.canceled设置成false,所以到了postSingleEventForEventType中时:

      while(var5.hasNext()) {
          Subscription subscription = (Subscription)var5.next();
          postingState.event = event;
          postingState.subscription = subscription;
      
          boolean aborted;
          try {
              this.postToSubscription(subscription, event, postingState.isMainThread);
              aborted = postingState.canceled;
          } finally {
              postingState.event = null;
              postingState.subscription = null;
              postingState.canceled = false;
          }
      
          if (aborted) {
              break;
          }
      }
      

      此时就可以取消下面的事件回调,达到终止传递的效果。

  • 总结

    EventBus的原理就是,在Activity的onCreate或onStart方法中注册订阅者,所谓订阅,就是通过反射找到所有标有@Subscribe注解的且是non-static的public方法,如果strictMethodVerification设为true的话还会强制要求方法参数个数为1,这些方法和其定义所在的类都保存在一个Subscription对象中,然后以eventType作为key,把所有定义了接收这个类型事件方法的类都存放在subscriptionsByEventType这个Map中,存放的时候会按照@Subscribe注解中priority属性的值来决定存放的顺序(值大的靠前)。注册的时候还会检查如果有sticky为true的方法,会遍历stickyEvents中的待处理的事件,如果和方法参数类型一致或是其父类型则会执行post逻辑(即尝试立即调用这个方法)。

    postSticky也会调用post方法,只不过在这之前会把事件存放到stickyEvents中,post方法最终会遍历subscriptionsByEventType找到所有和本次post事件相关联的Subscription,因为是从头部开始循环,所以这里就达到了priority的优先级的意义。当然这里取到的都是已注册的,还没有启动的Activity或Fragment是不会接收到调用的,所以如果想要发送事件给尚未启动的页面就需要调用postSticky方法来发送,正如我们前面说到的那样在注册的时候会调用,即延迟到页面启动的时候自动调用。

相关文章

网友评论

      本文标题:EventBus源码分析

      本文链接:https://www.haomeiwen.com/subject/nqsbwltx.html