美文网首页源码分析
EventBus3源码分析

EventBus3源码分析

作者: h2coder | 来源:发表于2020-09-21 15:09 被阅读0次

上篇,我们学习了EventBus3的使用方法,本篇一起来分析一下EventBus3的主要源码。

我们主要分析以下重点即可,其他的都是围绕这几个重点做补充:

  1. EventBus实例创建
  2. register注册流程
  3. post事件发送流程
  4. unregister解注册流程

EventBus实例创建

EventBus实例的创建方式有2种

  • EventBus.getDefault(),获取默认配置的实例。使用的是很常见的Double Check和volatile保证单例。
static volatile EventBus defaultInstance;

/**
 * Double Check 方式单例
 */
public static EventBus getDefault() {
    EventBus instance = defaultInstance;
    if (instance == null) {
        synchronized (EventBus.class) {
            instance = EventBus.defaultInstance;
            if (instance == null) {
                instance = EventBus.defaultInstance = new EventBus();
            }
        }
    }
    return instance;
}
  • 自定义配置,使用的是Builder建造者模式,可对EventBus的一些配置进行自定义,例如配置子线程回调订阅者方法的线程池执行器等。最后build()方法可按配置成EventBus实例,后续使用需要自己保存好实例,如果调用getDefault()获取的还是默认实例。如果希望让EventBus保存默认实例则可调用installDefaultEventBus()来保存实例为EventBus的默认实例。
/**
 * 自定义配置EventBus实例
 */
public static EventBusBuilder builder() {
    return new EventBusBuilder();
}

/**
 * EventBus实例自定义Builder配置
 */
public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    boolean logSubscriberExceptions = true;
    boolean logNoSubscriberMessages = true;
    boolean sendSubscriberExceptionEvent = true;
    boolean sendNoSubscriberEvent = true;
    boolean throwSubscriberException;
    boolean eventInheritance = true;
    boolean ignoreGeneratedIndex;
    boolean strictMethodVerification;
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
    List<Class<?>> skipMethodVerificationForClasses;
    List<SubscriberInfoIndex> subscriberInfoIndexes;
    Logger logger;
    MainThreadSupport mainThreadSupport;

    EventBusBuilder() {
    }
    
    //省略其他配置的set方法...
    
    /**
     * 配置子线程执行的线程池执行器
     */
    public EventBusBuilder executorService(ExecutorService executorService) {
        this.executorService = executorService;
        return this;
    }

    //省略其他配置的set方法...

    /**
     * 将生成的实例安装到默认实例,后续则可以使用getDefault()获取回这个实例,注意只能安装一次
     * 因此installDefaultEventBus()之前也不能调用getDefault()获取过实例,否则抛出异常
     */
    public EventBus installDefaultEventBus() {
        synchronized (EventBus.class) {
            if (EventBus.defaultInstance != null) {
                throw new EventBusException("Default instance already exists." +
                        " It may be only set once before it's used the first time to ensure consistent behavior.");
            }
            EventBus.defaultInstance = build();
            return EventBus.defaultInstance;
        }
    }
    
    /**
     * 按Builder配置生成EventBus实例
     */
    public EventBus build() {
        return new EventBus(this);
    }
}

订阅者注册

订阅者的注册,通过调用register()方法,传入订阅者实例即可,一般我们为Activity、Fragment中使用。接下来分析注册流程。

  • 获取订阅者的Class,通过subscriberMethodFinder.findSubscriberMethods()查找订阅者的所有订阅方法。
  • 遍历每个订阅方法,为每个订阅方法进行注册。
/**
 * 订阅事件
 */
public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    //查找订阅者所有的订阅方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        //遍历订阅的方法列表,对每个订阅方法都进行注册
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

来分析一下subscriberMethodFinder.findSubscriberMethods()。SubscriberMethodFinder类是一个专门查找订阅者方法的一个类。

  • 反射查找订阅方法是一个很耗时的操作,所以EventBus也做了缓存METHOD_CACHE,搜索前先从缓存中找,找得到则马上使用。
  • 接下来是2个分支,一个是apt编译器生成订阅信息索引的方法,一种是反射获取订阅者的订阅方法,ignoreGeneratedIndex标志位标识是否忽略apt生成的订阅信息索引,默认为false,所以会走else逻辑。
  • findUsingInfo()方法,则是反射获取订阅者的订阅信息。
  • 如果找不到订阅信息,证明订阅者没有声明订阅方法,抛出异常。找得到则将订阅信息缓存到METHOD_CACHE,然后再返回。
//订阅方法缓存
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

/**
 * 查找订阅者的订阅方法,将订阅信息封装在SubscriberMethod类中,多个方法,所以返回值为一个List集合
 *
 * @param subscriberClass 订阅者Class
 */
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    //先从缓存中查找,有则直接使用
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }
    //是否忽略apt生成的索引,ignoreGeneratedIndex默认为false
    if (ignoreGeneratedIndex) {
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        //反射获取所有订阅的方法
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    //没有任何一个订阅方法,抛出异常
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        //获取到所有订阅方法后,保存到缓存中,然后返回
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}

findSubscriberMethods()的主要逻辑都在findUsingInfo()上,继续追踪

  • 第一步,调用prepareFindState()方法,获取当前线程的查找状态并初始化
  • 第二步,判断findState.subscriberInfo字段是否为空,为空调用findUsingReflectionInSingleClass(),这个方法才是反射获取订阅者的订阅信息
  • 第三步,配置父类Class信息,会自动忽略系统类来提高性能
  • 最后,getMethodsAndRelease(),转移findState上保存的List<SubscriberMethod>,并对FindState中间对象回收
/**
 * 反射查找订阅者所有的订阅方法
 *
 * @param subscriberClass 订阅者Class
 */
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    //获取当前线程的查找状态
    FindState findState = prepareFindState();
    //初始化一些值
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        //获取订阅信息
        findState.subscriberInfo = getSubscriberInfo(findState);
        //初始化时,findState.subscriberInfo为null
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            //反射获取订阅者的所有订阅方法
            findUsingReflectionInSingleClass(findState);
        }
        //配置父类Class信息,会自动忽略系统类来提高性能
        findState.moveToSuperclass();
    }
    //转移findState上保存的List<SubscriberMethod>,并对FindState中间对象回收
    return getMethodsAndRelease(findState);
}

分析prepareFindState()方法

  • 很明显,FindState是复用对象的,作用就是避免频繁创建FindState类。
  • 先从缓存池中查找FindState可用的对象,可用则返回。
  • 不可用,则新建一个FindState类实例。
//对象池大小
private static final int POOL_SIZE = 4;
//FindState对象池
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

/**
 * 获取一个查找状态
 */
private FindState prepareFindState() {
    synchronized (FIND_STATE_POOL) {
        //享元模式,从对象池中找,避免频繁创建FindState类
        for (int i = 0; i < POOL_SIZE; i++) {
            FindState state = FIND_STATE_POOL[i];
            if (state != null) {
                FIND_STATE_POOL[i] = null;
                return state;
            }
        }
    }
    //没有缓存对象可用,则创建一个
    return new FindState();
}

findState.initForSubscriber(subscriberClass),则是初始化findState的初始化一些值

/**
 * 查找状态实体类
 */
static class FindState {
    Class<?> subscriberClass;
    boolean skipSuperClasses;
    SubscriberInfo subscriberInfo;

    //初始化findState的初始化一些值
    void initForSubscriber(Class<?> subscriberClass) {
        this.subscriberClass = clazz = subscriberClass;
        skipSuperClasses = false;
        subscriberInfo = null;
    }
}

findState经过initForSubscriber()后,subscriberInfo字段被置为null,所以只能走else逻辑,继续分析findUsingReflectionInSingleClass()

  • findUsingReflectionInSingleClass()方法,主要是使用反射来获取订阅者的Subscribe注解的方法,然后将方法的Method对象(有这个对象就能调用invoked调用订阅者方法),事件Class,配置的threadMode,priorty优先级,是否粘性sticky,封装到SubscriberMethod对象中,最后所有的订阅方法都在SubscriberMethod对象都收集到了findState.subscriberMethods集合中。这样订阅者的订阅信息都收集到了。
/**
 * 反射获取订阅者的所有订阅方法
 *
 * @param findState 查找状态
 */
private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        //使用getDeclaredMethods,反射获取所有方法,不会获取到父类中的方法,避免查找耗时,尤其是Activity,一般我们都是在子类上使用
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        methods = findState.clazz.getMethods();
        findState.skipSuperClasses = true;
    }
    //遍历订阅者的方法
    for (Method method : methods) {
        //获取修饰符
        int modifiers = method.getModifiers();
        //判断方法的修饰符是否为Public公开,并且不是static静态方法,
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            //获取方法参数
            Class<?>[] parameterTypes = method.getParameterTypes();
            //限定订阅方法的方法参数为1个,就是event事件类
            if (parameterTypes.length == 1) {
                //判断方法是否加了@Subscribe注解,必须加了才处理
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    //获取第一个参数,就是事件event
                    Class<?> eventType = parameterTypes[0];
                    //检查是否已经添加过了,没有添加过才继续
                    if (findState.checkAdd(method, eventType)) {
                        //获取@Subscribe注解上的threadMode参数,就是事件回调的线程策略
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        //创建SubscriberMethod对象,代表每个订阅方法的信息
                        //将@Subscribe注解上定义的事件类型、线程回调策略、回调优先级、是否粘性等字段保存到SubscriberMethod类中
                        //再将SubscriberMethod对象保存到findState.subscriberMethods订阅的方法列表中
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}

回到SubscriberMethodFinder的findUsingInfo()方法,调用完findUsingReflectionInSingleClass()收集订阅者订阅信息后,再调用了moveToSuperclass()方法,其实就是配置父类Class信息,会自动忽略系统类来提高性能。

/**
 * 配置父类Class信息,会自动忽略系统类来提高性能
 */
void moveToSuperclass() {
    if (skipSuperClasses) {
        clazz = null;
    } else {
        clazz = clazz.getSuperclass();
        String clazzName = clazz.getName();
        //跳过系统的类(肯定不会有EventBus的东西),来提高性能
        if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
            clazz = null;
        }
    }
}

调用getMethodsAndRelease(),将findState中的subscriberMethods订阅信息集合转到一个ArrayList中,调用findState的recycle()重置对象字段,再将实例放到FIND_STATE_POOL对象池中复用。

/**
 * 将FindState上保存的订阅方法保存到一个List集合,并将FindState回收,将FindState类放到对象池中复用
 *
 * @param findState 保存了订阅信息的中间类
 */
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
    //将订阅的方法信息保存到一个List集合
    List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
    //回收,就是重置字段
    findState.recycle();
    synchronized (FIND_STATE_POOL) {
        for (int i = 0; i < POOL_SIZE; i++) {
            //保存到对象池中复用
            if (FIND_STATE_POOL[i] == null) {
                FIND_STATE_POOL[i] = findState;
                break;
            }
        }
    }
    //返回这个List集合
    return subscriberMethods;
}

/**
 * 查找状态实体类
 */
static class FindState {
    //...忽略其他字段和方法

    /**
     * 回收操作,重置字段
     */
    void recycle() {
        subscriberMethods.clear();
        anyMethodByEventType.clear();
        subscriberClassByMethodKey.clear();
        methodKeyBuilder.setLength(0);
        subscriberClass = null;
        clazz = null;
        skipSuperClasses = false;
        subscriberInfo = null;
    }
}

调用findSubscriberMethods()方法获取到订阅者的订阅信息列表后,我们回到register()方法,将订阅信息列表遍历,调用subscribe()方法,传入订阅对象和订阅信息。

  • subscribe()方法的作用是:将订阅信息和订阅关系分拆到3个Map中,然后订阅者优先级排序,最后如果有粘性方法订阅,查找粘性事件的Map,回调订阅者方法。

先来了解下这3个Map是什么(很重要,后续事件回调都需要用到它们):

  • subscriptionsByEventType,事件类型和订阅它的订阅者的订阅信息(包含订阅者对象和订阅方法),一对多关系,一个事件类型,对应多个订阅者信息。

  • typesBySubscriber,订阅者和它所订阅的事件类型映射,一对多关系,一个订阅者对应多个事件

  • stickyEvents,粘性事件映射表,一对一关系,一个事件对应一个最近发送的事件对象

/**
 * 事件类型和订阅它的订阅者的订阅信息(包含订阅者对象和订阅方法),一对多关系,一个事件类型,对应多个订阅者信息
 */
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
/**
 * 订阅者和它所订阅的事件类型映射,一对多关系,一个订阅者对应多个事件
 */
private final Map<Object, List<Class<?>>> typesBySubscriber;
/**
 * 粘性事件映射表,一对一关系,一个事件对应一个最近发送的事件对象
 */
private final Map<Class<?>, Object> stickyEvents;

/**
 * 将订阅者和订阅方法执行绑定
 *
 * @param subscriber       订阅者
 * @param subscriberMethod 订阅方法信息对象
 */
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    //获取事件类型
    Class<?> eventType = subscriberMethod.eventType;
    //新建Subscription订阅信息类
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    //从subscriptionsByEventType中查找订阅信息列表
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    //没有,则创建一个,并放到subscriptionsByEventType中
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        //已经调用了register方法订阅过了,不能重复订阅
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    //优先级排序,优先级越高,越在List列表中靠前
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    //从typesBySubscriber中用订阅者获取它订阅的事件类型列表
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    //还没有注册过,所以时间类型列表subscribedEvents为空,为空则创建一个
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        //创建完毕,再保存到Map
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    //将事件类型添加到事件类型列表中
    subscribedEvents.add(eventType);

    //判断订阅方法是否是粘性的
    if (subscriberMethod.sticky) {
        //判断是否需要发送子类事件时也发送父类事件,默认为true,如果事件POJO不会继承,建议设置为false来提高性能
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            //粘性事件意思是订阅时,如果之前有发送过粘性事件则马上回调订阅方法
            //从粘性事件列表以事件类型中获取粘性事件POJO实例(所有粘性事件都会保存最近一份到内存)
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

判断到存在粘性订阅方法时,调用checkPostStickyEventToSubscription()方法,检查粘性事件是否存在,存在则调用postToSubscription发送事件,回调订阅者的订阅方法。postToSubscription()方法我们在下面post()事件时分析。

/**
 * 检查粘性事件对象,以及将粘性事件发送到订阅者
 *
 * @param newSubscription 订阅信息
 * @param stickyEvent     粘性事件
 */
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    //没有发送过这个类型的粘性事件,那么就不做任何操作
    if (stickyEvent != null) {
        // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
        // --> Strange corner case, which we don't take care of here.
        //不为空,那么将这个粘性事件发送给订阅者
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

register流程我们分析完了,工作原理主要是使用反射获取订阅者使用Subscribe注册标记的订阅方法,里面的耗时操作都加上了享元模式-对象池缓存或者是Map缓存,避免重复查找。

获取完订阅者的订阅信息列表后,将订阅信息和订阅关系分拆到很关键的3个Map中,最后判断是否有粘性事件订阅方法,再判断粘性事件是否存在,存在则发送事件,回调订阅者的订阅方法。

发送事件

发送事件使用post()方法,发送粘性事件使用,下面来一起分析一下吧。

/**
 * 发送事件
 */
public void post(Object event) {
    //获取当前线程的发送状态
    PostingThreadState postingState = currentPostingThreadState.get();
    //获取发送状态的队列
    List<Object> eventQueue = postingState.eventQueue;
    //事件入队
    eventQueue.add(event);
    //如果没有在发送,则马上发送
    if (!postingState.isPosting) {
        //对postingState做一些配置
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            //队列不为空,则发送事件
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

第一步,通过currentPostingThreadState.get(),获取当前线程的发送状态。currentPostingThreadState是一个ThreadLocal对象,所以每个线程都有一个PostingThreadState对象。

  • 将事件放入postingState的eventQueue字段,eventQueue是事件队列。
  • 判断postingState.isPosting字段,如果没有在发送,则马上发送。
  • 通过一个while循环,调用postSingleEvent(),发送队列中的事件。
  • 发送接触后,finally块再将必要字段重置。
/**
 * ThreadLocal类型,保存当前线程的发送状态类,每个线程都有一份PostingThreadState
 */
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

/**
 * 发送线程的状态
 */
final static class PostingThreadState {
    /**
     * 事件队列
     */
    final List<Object> eventQueue = new ArrayList<>();
    /**
     * 是否正在发送中
     */
    boolean isPosting;
    /**
     * 是否主线程
     */
    boolean isMainThread;
    Subscription subscription;
    Object event;
    boolean canceled;
}

postSingleEvent()方法,发送单个事件。

  • 判断eventInheritance标志位,代表是否发送子类事件时也发送父类事件,然后调用postSingleEventForEventType()方法发送事件。
  • postSingleEventForEventType()方法返回一个Boolean值,代表这个事件是否有订阅者订阅,如果没有,则发送一个NoSubscriberEvent事件。
/**
 * 发送一个事件
 *
 * @param event        事件对象
 * @param postingState 发送状态
 */
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    //判断是否发送子类事件时也发送父类事件
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            //发送事件
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        //发送事件
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    //处理没有订阅者的情况
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        //没有订阅者,发送一个NoSubscriberEvent事件
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

分析postSingleEventForEventType()

  • subscriptionsByEventType:事件类型和订阅它的订阅者的订阅信息(包含订阅者对象和订阅方法),一对多关系,一个事件类型,对应多个订阅者信息

  • 从subscriptionsByEventType映射Map中,使用事件类型获取所有的订阅者信息,并赋值到subscriptions集合

  • 判断subscriptions集合是否为空,为空则代表没有订阅者,否则则有订阅者

  • 有订阅者,遍历subscriptions集合,调用postToSubscription()方法,为每个订阅者发送事件

/**
 * 按事件类型,发送单个事件
 *
 * @param event        事件对象
 * @param postingState 发送状态
 * @param eventClass   事件类型Class
 * @return 是否有订阅者订阅者这个事件
 */
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        //回查subscriptionsByEventType,获取要发送的这个事件的所有订阅者信息
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    //有订阅者订阅,则处理
    if (subscriptions != null && !subscriptions.isEmpty()) {
        //遍历订阅者,将事件发送给他们
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            //是否发送失败
            boolean aborted = false;
            try {
                //发送事件
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                //重置状态
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    //没有订阅者订阅
    return false;
}

看来postToSubscription是最终的发送方法,继续分析

  • 从subscription订阅信息中,查找出订阅方法回调模式,进行分类,将事件代理给不同的Poster。

  • invokeSubscriber()则是调用subscription订阅信息中的method对象的invoke()方法调用。

模式简介:

  • POSTING模式,直接在当前线程反射调用订阅者的订阅方法
  • MAIN模式,保证在主线程回调订阅者的订阅方法
  • MAIN_ORDERED模式,保证主线程中回调订阅者的订阅方法,但是每次都是用Handler去post一个消息
  • BACKGROUND模式,保证在子线程回调,如果当前已经在子线程,直接调用订阅者的订阅方法
  • ASYNC模式,无论在哪个线程都让asyncPoster执行任务,所以就算已经在线程中了,也新开一个线程执行
/**
 * 发送事件到订阅者
 *
 * @param subscription 订阅信息
 * @param event        事件类型
 * @param isMainThread 当时是否在主线程
 */
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    //分类事件回调线程模式
    switch (subscription.subscriberMethod.threadMode) {
        //POSTING模式,直接在当前线程反射调用订阅者的订阅方法
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            //MAIN模式,保证在主线程回调订阅者的订阅方法
            //判断当前是否为主线程,直接调用即可
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                //当前不在主线程,将任务交给mainThreadPoster主线程发送器使用Handler发送
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            //MAIN_ORDERED模式,保证主线程中回调订阅者的订阅方法,但是每次都是用Handler去post一个消息
            //所以即使当前已经是主线程了,也依然post一个消息给Handler排队执行
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                //不在安卓上使用EventBus,直接调用订阅者订阅方法
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            //BACKGROUND模式,保证在子线程回调,如果当前已经在子线程,直接调用订阅者的订阅方法
            if (isMainThread) {
                //不在子线程,将任务交给backgroundPoster
                backgroundPoster.enqueue(subscription, event);
            } else {
                //已经在子线程了,直接调用
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            //ASYNC模式,无论在哪个线程都让asyncPoster执行任务,所以就算已经在线程中了,也新开一个线程执行
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

/**
 * 发射调用订阅者的订阅方法
 *
 * @param subscription 订阅信息
 * @param event        事件对象
 */
void invokeSubscriber(Subscription subscription, Object event) {
    try {
        //拿到订阅信息的method对象,invoke()反射调用
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

接下来,我们分析每种Poster:

  • Poster接口,多种Poster,其实是采用了策略模式,抽取了接口Poster,不同策略的实现自己的enqueue()方法即可。
/**
 * 事件发送器
 */
interface Poster {
    /**
     * 入队一个事件发送任务
     *
     * @param subscription 订阅信息
     * @param event        事件对象
     */
    void enqueue(Subscription subscription, Object event);
}
  • PendingPost消息,相当于Handler的Messsage对象,PendingPost类则是将每次要发送的事件和下一个事件用next字段保存,内部有一个static的pendingPostPool,其实是PendingPost的对象池,obtainPendingPost()从对象池中获取一个消息,releasePendingPost()则是将消息放回对象池中。
/**
 * 封装等待发送事件Api的类,相当于一个消息,类似Handler的Message对象,内部有对象池和事件
 */
final class PendingPost {
    /**
     * 队列
     */
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
    /**
     * 事件
     */
    Object event;
    /**
     * 订阅者信息
     */
    Subscription subscription;
    /**
     * 下一个要发送的信息
     */
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    /**
     * 从池子中获取一个消息
     *
     * @param subscription 订阅信息
     * @param event        事件
     */
    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                //对字段赋值
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

    /**
     * 回收
     *
     * @param pendingPost 包裹了订阅者信息的消息
     */
    static void releasePendingPost(PendingPost pendingPost) {
        //重置字段
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        //将对象放回对象池
        synchronized (pendingPostPool) {
            //这里对池子大小做限制,不然会不断让池容量增长
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }
}
  • PendingPostQueue,Poster发送事件使用的队列,维护了2个消息,队头消息和队尾消息,提供enqueue()方法消息入队,poll()方法获取下一个消息,这2个方法中,维护消息之间的链表关系。
/**
 * 发送事件队列,维护一个消息对象链表
 */
final class PendingPostQueue {
    /**
     * 队头消息
     */
    private PendingPost head;
    /**
     * 队尾
     */
    private PendingPost tail;

    /**
     * 入队
     *
     * @param pendingPost 下一个消息
     */
    synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        //将事件绑定在,当前最后一个事件对象的next
        if (tail != null) {
            tail.next = pendingPost;
            tail = pendingPost;
        } else if (head == null) {
            //第一次,队头为空,赋值
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();
    }

    /**
     * 获取下一个消息
     */
    synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }

    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }
}
  1. Android主线程Poster:HandlerPoster
  2. 子线程串行Poster:BackgroundPoster
  3. 子线程并发Poster:AsyncPoster
  • Android主线程Poster:HandlerPoster,吱声继承Handler,实现Poster接口,enqueue()方法首先调用obtainPendingPost()获取一个消息,然后入队队列,判断如果没有执行则开始,然后就是用Handler的sendMessage()发消息,Handler发送消息,会主线程回调handleMessage()方法,用while循环,不断从队列中拿取消息,然后使用消息内的订阅信息,invokeSubscriber()调用订阅者的订阅方法,这样就实现了主线程回调。
/**
 * Android主线程事件发送器,继承Handler,实现Poster接口
 */
public class HandlerPoster extends Handler implements Poster {
    /**
     * 发送队列
     */
    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    /**
     * 是否正在运行,没有限制
     */
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    /**
     * 入队一个事件
     *
     * @param subscription 订阅信息
     * @param event        事件对象
     */
    @Override
    public void enqueue(Subscription subscription, Object event) {
        //获取一个等待发送的消息
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //消息入队
            queue.enqueue(pendingPost);
            //没有正在执行,那么执行
            if (!handlerActive) {
                handlerActive = true;
                //使用Handler发消息
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            //死循环,从队列中获取下一个消息
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        //为空再检查一遍,因为在并发情况,使用synchronized同步
                        pendingPost = queue.poll();
                        //真的是获取不到了,那么就是没有
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //反射调用订阅者的订阅方法,这里在Handler中回调,所以在主线程
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                //继续循环发送消息
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}
  • 子线程串行Poster:BackgroundPoster,实现了Runnable和Poster接口,enqueue()方法中,也是先obtainPendingPost()获取一个消息,然后消息入队,判断没有执行,则马上执行。通过eventBus.getExecutorService()获取配置的线程池执行器,调用执行器的execute(),将自己传入,能传入是因为BackgroundPoster实现了Runnable接口,那么自然run()肯定被复写了。run()方法,也是启动一个while循环,从队列中获取下一个消息,由于run()方法回调已经在子线程中执行了,所以invokeSubscriber()调用回订阅者的订阅方法,自然在子线程执行。BackgroundPoster是单线程执行的,它是怎么做到的呢?其实是因为executorRunning这个标志位,任务开始前置为true,任务结束才置为false,在enqueue()入队方法时就返回executorRunning是否为false,为false则只添加到队列中,这样自然成为了单线程执行了。其实还使用了synchronized来保证不同线程去enqueue()入队时,在子线程中线程不安全的问题。
/**
 * 子线程串行回调事件订阅的发送器,实现了Runnable和Poster接口
 */
final class BackgroundPoster implements Runnable, Poster {
    /**
     * 发送队列
     */
    private final PendingPostQueue queue;
    private final EventBus eventBus;

    /**
     * 这个变量为了让线程池只能单线程执行
     */
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    @Override
    public void enqueue(Subscription subscription, Object event) {
        //获取一个消息,并将任务重新初始化
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        //加synchronized保证单线程执行
        synchronized (this) {
            //任务入队
            queue.enqueue(pendingPost);
            //如果没有执行,马上执行
            if (!executorRunning) {
                executorRunning = true;
                //获取配置的线程池执行器进行执行,将任务包裹到自身去执行
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                //一直死循环执行
                while (true) {
                    //获取下一个消息,并设定1秒阻塞
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        //加synchronized保证单线程执行,这里也加是因为要保证executorRunning的值不出错
                        synchronized (this) {
                            //同样要双重检查
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                //没有事件了,跳出死循环
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //调用订阅者
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            //所有发送任务执行完毕,标志位置为false,下次再入队再继续执行
            executorRunning = false;
        }
    }
}
  • 子线程并发Poster:AsyncPoster,实现了Runnable,、Poster接口,和BackgroundPoster类似,但是不是串行而是并行,enqueue()方法获取一个消息,将消息入队,然后eventBus.getExecutorService()获取到线程池执行器后,马上execute()传入自身,那么也一样,run()方法肯定被重写,run()方法中,获取下一个消息,马上调用invokeSubscriber()回调订阅者的订阅方法,因为run()回调是在子线程,所以回调订阅者时为子线程调用。
/**
 * 子线程并发执行器,实现了Runnable,、Poster接口
 */
class AsyncPoster implements Runnable, Poster {
    /**
     * 消息队列
     */
    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    @Override
    public void enqueue(Subscription subscription, Object event) {
        //入队,获取一个缓存的PendingPost消息对象,重新初始化
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        //将消息入队
        queue.enqueue(pendingPost);
        //获取执行器执行
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        //获取一个PendingPost消息
        PendingPost pendingPost = queue.poll();
        if (pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        //反射调用订阅者的订阅方法
        eventBus.invokeSubscriber(pendingPost);
    }
}

订阅者解注册

  • typesBySubscriber:订阅者和它所订阅的事件类型映射,一对多关系,一个订阅者对应多个事件

解注册,使用typesBySubscriber获取订阅者订阅的所有事件类型,判断订阅列表是否为空,不为空,则for循环订阅的事件列表,调用unsubscribeByEventType(),再将订阅者从typesBySubscriber中移除。

/**
 * 注销事件注册
 *
 * @param subscriber 订阅者
 */
public synchronized void unregister(Object subscriber) {
    //获取订阅者订阅的所有事件类型
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    //没有订阅过,忽略
    if (subscribedTypes != null) {
        //遍历订阅的事件类型,取消注册
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        //从订阅者列表中移除
        typesBySubscriber.remove(subscriber);
    } else {
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}
  • 获取事件类型的所有订阅者列表
  • 有订阅者,则遍历订阅者列表,找出需要解注册的订阅者,将订阅信息active标志设置为false,表示取消,再将订阅者从订阅列表中移除。
/**
 * 使用事件类型,注销订阅
 *
 * @param subscriber 订阅者
 * @param eventType  事件类型
 */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    //获取事件类型的所有订阅者的订阅定系
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    //没有订阅者忽略
    if (subscriptions != null) {
        //遍历订阅者的订阅信息列表
        int size = subscriptions.size();
        for (int i = 0; i < size; i++) {
            //找到本次要取消订阅的订阅者信息
            Subscription subscription = subscriptions.get(i);
            if (subscription.subscriber == subscriber) {
                //设置为取消订阅
                subscription.active = false;
                //从列表中移除
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

发送粘性事件

发送粘性事件,使用的是postSticky()方法,先将事件放到stickyEvents这个Map中,保存事件类型和最近发送的事件对象。然后调用post()发送事件,流程和上面的Post发送事件流程一样。

  • stickyEvents:粘性事件映射表,一对一关系,一个事件对应一个最近发送的事件对象
/**
 * 发送粘性事件
 *
 * @param event 事件
 */
public void postSticky(Object event) {
    //保存事件到粘性事件映射表
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    //马上发送事件,和普通事件一样,粘性事件的特点是register()订阅时,马上检查是否有存在的粘性事件,有则马上回调
    post(event);
}

总结

EventBus中可见到运用了好几种设计模式:

  • 单例模式:Double Check保存EventBus实例
  • 建造者模式:EventBusBuilder自定义EventBus配置
  • 享元模式:对象池复用FindState对象,减少频繁创建
  • 策略模式:不同的线程有不同的Poster对象发送事件

熟悉设计模式,更容易理解开源框架的逻辑,可以说设计模式是框架中常用的套路了~

EventBus中的subscriptionsByEventType、typesBySubscriber,这2个Map,设计得很巧妙,在注册、反注册、发送事件等3个重要流程中都起到了很重要的作用,subscriptionsByEventType记录了订阅者和订阅方法信息保存,在Post发送时,使用subscriptionsByEventType反查出订阅对应的订阅方法,利用不同的Poster在不同线程中调用。typesBySubscriber则是保存订阅者和订阅的事件类型的关系,在注册和反注册中起到注册表的作用。

相关文章

网友评论

    本文标题:EventBus3源码分析

    本文链接:https://www.haomeiwen.com/subject/vkmvyktx.html