美文网首页Android开发Android开发经验谈Android面试相关
Event 3.0+ 源码详解(史上最详细图文讲解)

Event 3.0+ 源码详解(史上最详细图文讲解)

作者: 随时学丫 | 来源:发表于2018-07-17 20:57 被阅读8次

本文的整体结构图

本文篇幅很长,建议收藏了找时间慢慢看

整体结构图.png

本文讲解的是 'org.greenrobot:eventbus:3.1.1' 最新版,和其他版本有细微差别,思想都是一样的。

EventBus 的简单示例

@Override
protected void onStart() {
    super.onStart();
    EventBus.getDefault().register(this);
}

@Override
protected void onStop() {
    super.onStop();
    EventBus.getDefault().unregister(this);
}


@Subscribe(threadMode = ThreadMode.MAIN, sticky = false, priority = 2)
public void onMessageEvent(MyBusEvent event) {
    Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();
}

EventBus 的创建是一个单例模式,我们就从 getDefault() 开始讲起吧。

一、EventBus 的创建(单例 + Builder 设计模式)

1.1、单例模式获取 EventBus 实例

单例模式UML.png

EventBus 的创建是一个双重校验锁的单例模式。

public static EventBus getDefault() {
    if (sDefaultBus == null) {
        synchronized (EventBus.class) {
            if (sDefaultBus == null) {
                sDefaultBus = new EventBus();
            }
        }
    }
    return sDefaultBus;
}

单例模式没什么好说的,我们都知道单例模式的构造函数是私有类型 private,但是 EventBus 的构造函数却是 public 类型。

这样设计的目:EventBus 在代码使用过程中不仅仅只有一条总线,还有其他的订阅总线,订阅者可以注册到不同的 EventBus 总线,然后通过不同的 EventBus 总线发送数据。

不同的 EventBus 发送的数据是相互隔离的,订阅者只能收到注册了该 EventBus 总线内事件,而不会收到别的 EventBus 事件总线的数据。这样的设计为后面的不同环境的线程切换创造了好的条件。

    /**
     * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a central bus, consider {@link #getDefault()}.
     创建一个新的 EventBus 实例,每个实例在 events 事件被发送的时候都是一个单独的领域,为了使用一个 事件总线,考虑用 getDefault() 构建。
     */
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
    this(DEFAULT_BUILDER);
}

1.2、Builder 模式构建 EventBus

Builder模式UML图.png

这里的代码我们重点看一下 this(DEFAULT_BUILDER) 里面的 DEFAULT_BUILDER。DEFAULT_BUILDER 是一个 EventBusBuilder,从这里可以看出 EventBus 是通过 建造者模式进行构建的,接下来我们看下是如何构建的吧。

EventBus(EventBusBuilder builder) {
    logger = builder.getLogger();
    
    //Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType
    subscriptionsByEventType = new HashMap<>();
    //Map<Object, List<Class<?>>> typesBySubscriber
    typesBySubscriber = new HashMap<>();
    //Map<Class<?>, Object> stickyEvents
    stickyEvents = new ConcurrentHashMap<>();
    
    /**
    用于线程间调度
    **/
    mainThreadSupport = builder.getMainThreadSupport();
    mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    backgroundPoster = new BackgroundPoster(this);
    asyncPoster = new AsyncPoster(this);
    //用于记录event生成索引
    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
    //对已经注解过的Method的查找器,会对所设定过 @Subscriber 注解的的方法查找相应的Event
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);
    
    //当调用事件处理函数发生异常是否需要打印Log
    logSubscriberExceptions = builder.logSubscriberExceptions;
    //当没有订阅者订阅这个消息的时候是否打印Log
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    //当调用事件处理函数,如果异常,是否需要发送Subscriber这个事件
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    //当没有事件处理函数时,对事件处理是否需要发送sendNoSubscriberEvent这个标志
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    //是否需要抛出SubscriberException
    throwSubscriberException = builder.throwSubscriberException;
    
    //与Event有继承关系的类是否都需要发送
    eventInheritance = builder.eventInheritance;
    //线程池 Executors.newCachedThreadPool()
    executorService = builder.executorService;
}

二、EventBus 中几个重要的成员变量

2.1、EventBus 中重要的 3 个 HashMap。

2.1.1、subscriptionsByEventType

Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType 是以 Event 为 Key,Subscriber 为 Value,当发送这个 Event 时,可以在 Map 中找到相应的 Subscriber。

2.1.2、typesBySubscriber

Map<Object, List<Class<?>>> typesBySubscriber 以 Subscriber 为 Key,以 Event 为 Value,当我们注册和反注册时,都会操作 typesBySubscriber 这个 Map。

2.1.3、stickyEvents

Map<Class<?>, Object> stickyEvents 是管理 EventBus 的粘性事件,粘性事件的 Event 发送出去之后,即使负责接收的 Subscriber 没有注册,当注册之后,依然能收到该事件,和广播接受者的粘性事件类似。

2.2、EventBus 中重要的有 3 个 Post。

  • mainThreadPoster:主线程的 Poster

    mainThreadSupport.createPoster(this) 返回的是HandlerPoster(eventBus, looper, 10),looper 是一个 MainThread 的 Looper。意味着这个 HandlerPoster 可能就是 Handler 的实现。

  • backgroundPoster:后台线程的 Poster

  • asyncPoster:异步线程的 Poster

2.2.1、mainThreadPoster # HandlerPoster

HandlerPoster 其实就是 Handler 的实现,内部维护了一个 PendingPostQueue 的消息队列,在 enqueue(Subscription subscription, Object event) 方法中不断从 pendingPostPool 的 ArrayList 缓存池中获取 PendingPost 添加到 PendingPostQueue 队列中,并将该 PendingPost 事件发送到 Handler 中处理。

在 handleMessage 中,通过一个 while 死循环,不断从 PendingPostQueue 中取出 PendingPost 出来执行,获取到 post 之后由 eventBus 通过该 post 查找相应的 Subscriber 处理事件。

while 退出的条件有两个

  1. 获取到的 PendingPost 为 null,即是 PendingPostQueue 已经没有消息可处理。
  2. 每个 PendingPost 在 Handler 中执行的时间超过了最大的执行时间。

HandlerPoster UML 类图

HandlerPoster UML 类图

HandlerPoster 执行过程

HandlerPoster执行过程.png
public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;//存放待执行的 Post Events 的事件队列
    private final int maxMillisInsideHandleMessage;//Post 事件在 handleMessage 中执行的最大的时间值,超过这个时间值则会抛异常
    private final EventBus eventBus;
    private boolean handlerActive;//标识 Handler 是否被运行起来

   protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //从 pendingPostPool 的 ArrayList 缓存池中获取 PendingPost 添加到 PendingPostQueue 队列中,并将该 PendingPost 事件发送到 Handler 中处理。
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);//添加到队列中
            if (!handlerActive) {//标记 Handler 为活跃状态
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {//死循环,不断从 PendingPost 队列中取出 post 事件执行
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {//如果为 null,表示队列中没有 post 事件,此时标记 Handelr 关闭,并退出 while 循环
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false; //标记 Handler 为非活跃状态
                            return;
                        }
                    }
                }
                //获取到 post 之后由 eventBus 通过该 post 查找相应的 Subscriber 处理事件
                eventBus.invokeSubscriber(pendingPost);
                //计算每个事件在 handleMessage 中执行的时间
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

final class PendingPost {
    //通过ArrayList来实现PendingPost的添加和删除
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

    Object event;
    Subscription subscription;
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    //获取 PendingPost
    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

    //释放 PendingPost
    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }

}

2.2.2、backgroundPoster # BackGroundPoster

BackGroundPoster执行过程.png

BackgroundPoster 实现了 Runnable 和 Poster,enqueue() 和 HandlerPoster 中实现一样,在上文中已经讲过,这里不再赘述。

我们来看下 run() 方法中的实现,不断从 PendingPostQueue 中取出 pendingPost 到 EventBus 中分发,这里注意外部是 while() 死循环,意味着 PendingPostQueue 中所有的 pendingPost 都将分发出去。而 AsyncPoster 只是取出一个。

final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);//添加到队列中
            if (!executorRunning) {
                executorRunning = true;
                //在线程池中执行这个 pendingPost
                eventBus.getExecutorService().execute(this);//Executors.newCachedThreadPool()
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                //不断循环从 PendingPostQueue 取出 pendingPost 到 eventBus 执行
                while (true) {
                    //在 1000 毫秒内从 PendingPostQueue 中获取 pendingPost
                    PendingPost pendingPost = queue.poll(1000);
                    //双重校验锁判断 pendingPost 是否为 null
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();//再次尝试获取 pendingPost
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //将 pendingPost 通过 EventBus 分发出去
                   //这里会将PendingPostQueue中【所有】的pendingPost都会分发,这里区别于AsyncPoster
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
}

2.2.3、asyncPoster # AsyncPoster

AsyncPoster执行过程.png
class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);//Executors.newCachedThreadPool()
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}

三、@Subscribe 的注解

当我们指定订阅方法的时候,会在方法上加上注解,如下,下面我们看看这个注解的具体含义

@Subscribe(threadMode = ThreadMode.MAIN, sticky = false, priority = 2)
public void onMessageEvent(EventTest event) {
    
}

@Documented //命名为 java doc 文档
@Retention(RetentionPolicy.RUNTIME) //指定在运行时有效,即在运行时能保持这个 Subscribe
@Target({ElementType.METHOD}) //指定类型为 METHOD,表名用来描述方法
public @interface Subscribe {
    //指定线程模式,可以指定在 Subscribe 中接收的 Event 所处的线程
    ThreadMode threadMode() default ThreadMode.POSTING;
    boolean sticky() default false;
    int priority() default 0;
}

3.1、ThreadMode

public enum ThreadMode {
    POSTING,//EventBus 默认的线程模式
    MAIN,//主线程
    MAIN_ORDERED,//主线程
    BACKGROUND,//后台线程
    ASYNC//异步线程
}

ThreadMode 是 enum(枚举)类型,threadMode 默认值是 POSTING。3.1.1 版本的 EventBus 新增了一种类型,共 5 种,以前的版本是 4 种。

  • POSTING:事件发布在什么线程,就在什么线程订阅。故它不需要切换线程来分发事件,因此开销最小。它要求 task 完成的要快,不能请求 MainThread,适用简单的 task。
  • MAIN:无论事件在什么线程发布,都在主线程订阅。事件将排队等待交付(非阻塞)。使用此模式的订阅者必须快速返回,以避免阻塞主线程。
  • MAIN_ORDERED:无论事件在什么线程发布,都在主线程订阅。区别于 MAIN,MAIN_ORDERED 事件将始终排队等待交付。这将确保 post 调用是非阻塞的。
  • BACKGROUND:如果发布的线程不是主线程,则在该线程订阅,如果是主线程,则使用一个单独的后台线程订阅。
  • ASYNC:在非主线程和发布线程中订阅。当处理事件的 Method 是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus 使用线程池高效的复用已经完成异步操作的线程。

3.2、sticky 粘性事件

粘性事件是事件消费者在事件发布之后才注册,依然能接收到该事件的特殊类型。

StickyEvent 与普通 Event的 普通就在于,EventBus 会自动维护被作为 StickyEvent 被 post 出来(即在发布事件时使用 EventBus.getDefault().postSticky(new MyEvent()) 方法)的事件的最后一个副本在缓存中。 任何时候在任何一个订阅了该事件的订阅者中的任何地方,都可以通 EventBus.getDefault().getStickyEvent(MyEvent.class)来取得该类型事件的最后一次缓存。

sticky(粘性)默认值是 false,如果是 true,那么可以通过 EventBus 的 postSticky 方法分发最近的粘性事件给该订阅者(前提是该事件可获得)。

3.3、Priority

priority 是 Method 的优先级,优先级高的可以先获得分发事件的权利。这个不会影响不同的 ThreadMode 的分发事件顺序。

四、register 注册

register注册.png
  1. 通过反射获取到订阅者的 Class 对象。
  2. 通过 Class 对象找到对应的订阅者方法集合。
  3. 遍历订阅者方法集合,将订阅者和订阅者方法订阅起来。

register() 接收的参数为 Object 类型的订阅者,通常也就是代码中 Activity 和 Fragment 的实例 this。EventBus 通过 getDefault() 来获取实例,当我们每新建一个 EventBus 总线,它的发布和订阅事件都是相互隔离的,EventBus 如何做到发布和订阅相互隔离呢?我们看下 register 的实现。

public void register(Object subscriber) {
    //1. 通过反射获取到订阅者的Class对象
    Class<?> subscriberClass = subscriber.getClass();
    //2. 通过Class对象找到对应的订阅者方法集合
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    //3. 遍历订阅者方法集合,将订阅者和订阅者方法订阅起来。
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

在 1 中没什么可细讲的,接下来看下第 2 步中的 SubscriberMethodFinder 这个类的 findSubscriberMethods() 方法。

subscriberMethodFinder 是 EventBus 的一个成员,可以看作是一个订阅方法查找器,是在EventBus 构造方法通过 EventBusBuilder 的一些参数构造出来的。

调用 findSubscriberMethods 方法,传入订阅者的 Class 对象,字面意思是找出订阅者中所有的订阅方法,用一个 List 集合来接收。

4.1、SubscriberMethodFinder # findSubscriberMethods() 详解

findSubscriberMethods()详解.png
//订阅者的 Class 对象为 key,订阅者中的订阅方法 List 为 value
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    //1. 首先在 METHOD_CACHE 中查找该 Event 对应的订阅者集合是否已经存在,如果有直接返回
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    //2. 根据订阅者类 subscriberClass 查找相应的订阅方法
    if (ignoreGeneratedIndex) {//是否忽略生成 index
        //通过反射获取
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        //通过 SubscriberIndex 方式获取
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    
    //若订阅者中没有订阅方法,则抛异常
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        // 缓存订阅者的订阅方法 List
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}
//----------------------------SubscriberMethod----------------------------
//封装了EventBus中的参数,就是一个EventBus订阅方法包装类
public class SubscriberMethod {
    final Method method;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;
    String methodString;
    //.....涉及到的方法后文再讲
}

METHOD_CACHE 是一个 ConcurrentHashMap,以订阅者的 Class 对象为 key,订阅者中的订阅方法 List 为 value,缓存了注册过的订阅方法。

如果有缓存则返回返回缓存,如果没有则继续往下执行。这里看到 ignoreGeneratedIndex 这个属性,意思为是否忽略生成 index,是在构造 SubscriberMethodFinder 通过 EventBusBuilder 的同名属性赋值的,默认为 false,当为 true 时,表示以反射的方式获取订阅者中的订阅方法,当为 false 时,则以 Subscriber Index 的方式获取。接下来分别分析这两种方式。

4.1.1、findUsingReflection() 方法解析

当 ignoreGeneratedIndex 为 true 时 --> findUsingReflection()

findUsingReflection()解析.png
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    // 创建并初始化 FindState 对象
    FindState findState = prepareFindState();
    // findState 与 subscriberClass 关联
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        // 使用反射的方式获取单个类的订阅方法
        findUsingReflectionInSingleClass(findState);
        // 使 findState.clazz 指向父类的 Class,继续获取
        findState.moveToSuperclass();
    }
    // 返回订阅者及其父类的订阅方法 List,并释放资源
    return getMethodsAndRelease(findState);
}

4.1.2、findUsingInfo() 方法解析

当 ignoreGeneratedIndex 为 false 时 --> findUsingInfo()

findUsingInfo()解析.png

跟反射方式的 findUsingReflection 的首尾有点类似,不同的是它是通过 SubscriberInfo 这个类来获取订阅方法的,那么 SubscriberInfo 对象是怎么获取的呢,那么同样只看关键代码:getSubscriberInfo()

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    //1.通过 prepareFindState 获取到 FindState(保存找到的注解过的方法的状态)
    FindState findState = prepareFindState();
    //2.findState 与 subscriberClass 关联
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        //获取订阅者信息
        //通过 SubscriberIndex 获取 findState.clazz 对应的 SubscriberInfo
        findState.subscriberInfo = getSubscriberInfo(findState);
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    // 逐个添加进 findState.subscriberMethods
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            // 使用反射的方式获取单个类的订阅方法
            findUsingReflectionInSingleClass(findState);
        }
        findState.moveToSuperclass();
    }
    return getMethodsAndRelease(findState);
}

FindState 封装了所有的订阅者和订阅方法的集合。

static class FindState {
    //保存所有订阅方法
    final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
    //事件类型为Key,订阅方法为Value
    final Map<Class, Object> anyMethodByEventType = new HashMap<>();
    //订阅方法为Key,订阅者的Class对象为Value
    final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
    final StringBuilder methodKeyBuilder = new StringBuilder(128);

    Class<?> subscriberClass;
    Class<?> clazz;
    boolean skipSuperClasses;
    SubscriberInfo subscriberInfo;
    //......
}

通过 prepareFindState 获取到 FindState 对象,根据 FindState 对象可以进行下一步判断,

private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
private FindState prepareFindState() {
    //找到 FIND_STATE_POOL 对象池
    synchronized (FIND_STATE_POOL) {
        for (int i = 0; i < POOL_SIZE; i++) {
            //当找到了对应的FindState
            FindState state = FIND_STATE_POOL[i];
            if (state != null) {//FindState 非空表示已经找到
                FIND_STATE_POOL[i] = null; //清空找到的这个FindState,为了下一次能接着复用这个FIND_STATE_POOL池
                return state;//返回该 FindState
            }
        }
    }
    //如果依然没找到,则创建一个新的 FindState
    return new FindState();
}

4.1.3、findUsingReflectionInSingleClass() 方法解析

通过反射找到对应的方法,并进行过滤

findUsingReflectionInSingleClass()解析.png
private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        methods = findState.clazz.getMethods();
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        //忽略非 public 和 static 的方法
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            // 获取订阅方法的所有参数
            Class<?>[] parameterTypes = method.getParameterTypes();
            // 订阅方法只能有一个参数,否则忽略
            if (parameterTypes.length == 1) {
                // 获取有 Subscribe 的注解
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    // 获取第一个参数
                    Class<?> eventType = parameterTypes[0];
                    // 检查 eventType 决定是否订阅,通常订阅者不能有多个 eventType 相同的订阅方法
                    if (findState.checkAdd(method, eventType)) {
                        // 获取线程模式
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        // 添加订阅方法进 List
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}

经过修饰符、参数个数、是否有注解、和订阅者是否有 eventType 相同的方法几层条件的筛选,最终将订阅方法添加进 findState 的 subscriberMethods 这个 List 中。

EventBus 不仅仅获取当前类的订阅方法,还会获取它所有父类的订阅方法。

在 EventBus 中,一个订阅者包括这个订阅者的所有父类和子类,不会有多个方法相同的去接收同一个事件,但是有可能出现这样一种情况,子类去订阅了该事件,父类也去订阅了该事件。当出现这种情况,EventBus 如何判断?通过调用 checkAddWithMethodSignature() 方法,根据方法签名来检查。

4.1.4、checkAdd() & checkAddWithMethodSignature() 方法解析

checkAdd()&checkAddWithMethodSignature()解析.png
boolean checkAdd(Method method, Class<?> eventType) {
    //事件类型为Key,订阅方法为Value
    final Map<Class, Object> anyMethodByEventType = new HashMap<>();
    //put()方法执行之后,返回的是之前put的值
    Object existing = anyMethodByEventType.put(eventType, method);
    if (existing == null) {
        return true;
    } else {
        if (existing instanceof Method) {
            if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                // Paranoia check
                throw new IllegalStateException();
            }
            // Put any non-Method object to "consume" the existing Method
            anyMethodByEventType.put(eventType, this);
        }
        //根据方法签名来检查
        return checkAddWithMethodSignature(method, eventType);
    }
}

//订阅方法为Key,订阅者的Class对象为Value
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(method.getName());
    methodKeyBuilder.append('>').append(eventType.getName());

    String methodKey = methodKeyBuilder.toString();
    Class<?> methodClass = method.getDeclaringClass();
    //put方法返回的是put之前的对象
    Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
    //如果methodClassOld不存在或者是methodClassOld的父类的话,则表明是它的父类,直接返回true。否则,就表明在它的子类中也找到了相应的订阅,执行的 put 操作是一个 revert 操作,put 进去的是 methodClassOld,而不是 methodClass
    if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
        // Only add if not already found in a sub class
        return true;
    } else {
        //这里是一个revert操作,所以如果找到了它的子类也订阅了该方法,则不允许父类和子类都同时订阅该事件,put 的是之前的那个 methodClassOld,就是将以前的那个 methodClassOld 存入 HashMap 去覆盖相同的订阅者。
        //不允许出现一个订阅者有多个相同方法订阅同一个事件
        // Revert the put, old class is further down the class hierarchy
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false;
    }
}

FindState 的流程

  1. 调用 prepareFindState(),从 FIND_STATE_POOL 中获取一个 FindState,如果没有找到,则创建一个新的 FindState。
  2. 将 subscriberClass 通过 FindState 进行初始化。
  3. 返回所有的订阅者的方法和集合。

到此,两种方式讲解完毕。无论通过哪种方式获取,获取到订阅方法 List 之后,接下来是真正订阅的过程,回到register() 中看代码。

4.2、subscribe 订阅

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        //迭代每个 Subscribe 方法,调用 subscribe() 传入 subscriber(订阅者) 和 subscriberMethod(订阅方法) 完成订阅,
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}
subscribe订阅.png
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    // 创建 Subscription 封装订阅者和订阅方法信息
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    //可并发读写的ArrayList,key为EventType,value为Subscriptions
    //根据事件类型从 subscriptionsByEventType 这个 Map 中获取 Subscription 集合
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    //如果为 null,表示还没有订阅过,创建并 put 进 Map
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        //若subscriptions中已经包含newSubscription,表示该newSubscription已经被订阅过,抛出异常
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
        }
    }

    // 按照优先级插入subscriptions
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    //key为订阅者,value为eventType,用来存放订阅者中的事件类型
    //private final Map<Object, List<Class<?>>> typesBySubscriber;
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    //将EventType放入subscribedEvents的集合中
    subscribedEvents.add(eventType);

    //判断是否为Sticky事件
    if (subscriberMethod.sticky) {
        //判断是否设置了事件继承
        if (eventInheritance) {
            //获取到所有Sticky事件的Set集合
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            //遍历所有Sticky事件
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                //判断当前事件类型是否为黏性事件或者其子类
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    // 执行设置了 sticky 模式的订阅方法
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

4.3、register 流程图

register流程图.png

事件的订阅讲解到这里,接下来看看事件的分发 Post。

五、Post 分发

5.1、Post 方法解析

一般的事件发布方式

EventBus.getDefault().post(new EventTest());

接下来看看 Post 方法的具体代码。

post分发.png
//currentPostingThreadState 线程独有的
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

public void post(Object event) {
    //获取当前线程的 posting 状态
    PostingThreadState postingState = currentPostingThreadState.get();
    //获取当前事件队列
    List<Object> eventQueue = postingState.eventQueue;
    //将事件添加进当前线程的事件队列
    eventQueue.add(event);
    //判断是否正在posting
    if (!postingState.isPosting) {
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        //如果已经取消,则抛出异常
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            while (!eventQueue.isEmpty()) {//发送事件
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            //状态复原
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

//发送事件的线程封装类
final static class PostingThreadState {
    final List<Object> eventQueue = new ArrayList<>();//事件队列
    boolean isPosting;//是否正在 posting
    boolean isMainThread;//是否为主线程
    Subscription subscription;
    Object event;
    boolean canceled;//是否已经取消
}

EventBus 用 ThreadLocal 存储每个线程的 PostingThreadState,一个存储了事件发布状态的类,当 post 一个事件时,添加到事件队列末尾,等待前面的事件发布完毕后再拿出来发布,这里看事件发布的关键代码postSingleEvent()。

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
   Class<?> eventClass = event.getClass();
   boolean subscriptionFound = false;
   if (eventInheritance) {
       //查找到所有继承关系的事件类型,将该类的父类全部放入集合中
       List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
       int countTypes = eventTypes.size();
       
       for (int h = 0; h < countTypes; h++) {
           Class<?> clazz = eventTypes.get(h);
           //判断是否找到订阅者订阅了该事件
           subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
       }
   } else {
       subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
   }
   if (!subscriptionFound) {
       if (logNoSubscriberMessages) {
           logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
       }
       if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
               eventClass != SubscriberExceptionEvent.class) {
           //发送没有订阅者订阅该事件
           post(new NoSubscriberEvent(this, event));
       }
   }
}

代码也非常简单,首先看 eventInheritance 这个属性,是否开启事件继承,若是,找出发布事件的所有父类,也就是 lookupAllEventTypes(),然后遍历每个事件类型进行发布。若不是,则直接发布该事件。

如果需要发布的事件没有找到任何匹配的订阅信息,则发布一个 NoSubscriberEvent 事件。这里只看发布事件的关键代码 postSingleEventForEventType()。

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
   CopyOnWriteArrayList<Subscription> subscriptions;
   //获取到 Subscription 的集合
   synchronized (this) {
       subscriptions = subscriptionsByEventType.get(eventClass);
   }
   if (subscriptions != null && !subscriptions.isEmpty()) {
       for (Subscription subscription : subscriptions) {
           postingState.event = event;
           postingState.subscription = subscription;
           boolean aborted = false;
           try {
               //调用 postToSubscription 发送
               postToSubscription(subscription, event, postingState.isMainThread);
               aborted = postingState.canceled;
           } finally {
               postingState.event = null;
               postingState.subscription = null;
               postingState.canceled = false;
           }
           if (aborted) {
               break;
           }
       }
       return true;
   }
   return false;
}

来到这里,开始根据事件类型匹配出订阅信息,如果该事件有订阅信息,则执行 postToSubscription(),发布事件到每个订阅者,返回 true,若没有,则返回 false。继续追踪发布事件到具体订阅者的代码 postToSubscription()。

5.2、postToSubscription() 方法解析

postToSubscription()解析.png
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        //订阅线程跟随发布线程,EventBus 默认的订阅方式
        case POSTING:
            // 订阅线程和发布线程相同,直接订阅
            invokeSubscriber(subscription, event);
            break;
        // 订阅线程为主线程
        case MAIN:
            //如果在 UI 线程,直接调用 invokeSubscriber
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
     //如果不在 UI 线程,用 mainThreadPoster 进行调度,即上文讲述的 HandlerPoster 的 Handler 异步处理,将订阅线程切换到主线程订阅
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        // 订阅线程为主线程
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        // 订阅线程为后台线程
        case BACKGROUND:
            //如果在 UI 线程,则将 subscription 添加到后台线程的线程池
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
            //不在UI线程,直接分发
                invokeSubscriber(subscription, event);
            }
            break;
        // 订阅线程为异步线程
        case ASYNC:
            // 使用线程池线程订阅
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

订阅者五种线程模式的特点对应的就是以上代码,简单来讲就是订阅者指定了在哪个线程订阅事件,无论发布者在哪个线程,它都会将事件发布到订阅者指定的线程。这里我们继续看实现订阅者的方法。

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

订阅者接收到了事件,调用订阅方法,传入发布的事件作为参数,至此,事件发布过程就结束了。

5.3、post 流程图

post流程图.png

六、unregister 反注册

先看反注册的代码

EventBus.getDefault().unregister(this);

跟踪 unregister()

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        typesBySubscriber.remove(subscriber);
    } else {
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}

注册过程我们就知道 typesBySubscriber 是保存订阅者的所有订阅事件类型的一个 Map,这里根据订阅者拿到订阅事件类型 List,然后逐个取消订阅,最后 typesBySubscriber 移除该订阅者,。这里只需要关注它是如果取消订阅的,跟踪 unsubscribeByEventType()。

/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        int size = subscriptions.size();
        for (int i = 0; i < size; i++) {
            Subscription subscription = subscriptions.get(i);
            if (subscription.subscriber == subscriber) {
                subscription.active = false;
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

subscriptionsByEventType 是存储事件类型对应订阅信息的 Map,代码逻辑非常清晰,找出某事件类型的订阅信息 List,遍历订阅信息,将要取消订阅的订阅者和订阅信息封装的订阅者比对,如果是同一个,则说明该订阅信息是将要失效的,于是将该订阅信息移除。

七、总结

回顾一下 EventBus 的三个步骤

  1. 注册订阅者
  2. 事件发布
  3. 反注册订阅者

好了,EventBus 的源码解析到这就结束了,想进一步了解 EventBus 的朋友可以亲自去阅读源码。

相关文章

网友评论

    本文标题:Event 3.0+ 源码详解(史上最详细图文讲解)

    本文链接:https://www.haomeiwen.com/subject/rqogpftx.html