美文网首页
EventBus 3.1.1源码分析

EventBus 3.1.1源码分析

作者: 青墨一隅 | 来源:发表于2019-04-02 13:52 被阅读0次

一.通过一个demo来了解基本使用

项目中导入EventBus

implementation'org.greenrobot:eventbus:3.1.1'

定义传递事件参数

因为eventbus不支持基本数据类型的参数传递,这里我们定义一个MessageEvent消息实体类来进行数据传递。

data class MessageEvent(val age: Int, val name: String) {}

mainActivity

主要负责添加注册和接收SencondActivity发送来的事件

class MainActivity : AppCompatActivity() {
    
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        //注册
        EventBus.getDefault().register(this)
        initListener()
    }

    private fun initListener() {
        sencondActivity.setOnClickListener {
            startActivity(Intent(this@MainActivity,SencondActivity::class.java))
        }
    }

    /**
     * 接受事件
     * @param mess MessageEvent
     */
    @Subscribe(threadMode = ThreadMode.MAIN)
    fun getMessageEvent(mess: MessageEvent){
        tv_content.setText(mess.toString())
    }

    override fun onDestroy() {
        super.onDestroy()
        //取消注册,防止内存泄露
        EventBus.getDefault().unregister(this)
    }
}

SencondActivity

负责发送事件

class SencondActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_sencond)
        initListener()
    }

    private fun initListener() {
        btn_event.setOnClickListener{
            //发送事件
            EventBus.getDefault().post(MessageEvent(12,"测试"))
        }
    }
}

运行效果

demo.gif

为什么使用Eventbus?

使用EventBus主要是用于在Activities, Fragments, Threads, Services等之间通信。我们常用的通信方式有哪些呢?

  • Handler,调用者必须要Handler这个句柄;必须写在handleMessage(){}这个回调方法中;不能跨进程,可以跨线程通信。
  • 接口,一个方法对应一个接口导致接口过多;不能跨线程。
  • 广播,回调方法有10s限制。
    除了跨进程通信,这些问题EventBus都解决

源码分析

观察者模式分析

上面的简单使用实现了SencondActivity和MainActivity两个页面进行通信,SencondActivity是发送事件的相当于被观察者;而MainActivity在onCreate()方法注册并且定义了一个接收事件的方法相当于观察者;看到这里大家有没有一种熟悉的感觉,这不是典型的观察者模式嘛,引用一张官网的图片:

image.png

不熟悉观察者模式的同学可以通过下面这个UML模型快速掌握观察者模式:


观察者模式结构图

事件分析

首先从注册入口开始分析EventBus.getDefault().register(this),先进入getDefault()方法看一下

public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

这里看出来EventBus是一个通过双重加锁创建的单列

 public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

这里是注册方法,从demo可以看出来我们传进来的是this也就是MainActivity的实例,第二代码可以看出来是根据传入的class对象来获取一个List,我们看一下findSubscriberMethods()方法

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //从缓存中取对应的class的接收事件方法列表,如果存在就不需要再注册,没有就创建
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        //默认为false
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

这里的核心来看一下METHOD_CACHE是什么,

private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

这里使用一个线程安全的ConcurrentHashMap来保存数据,其中key就是this的class对象,value是什么呢,它是一个用来保存接收事件相关参数的一个类,我来看一看这个SubscriberMethod

public class SubscriberMethod {
    //接收事件的方法名
    final Method method;
   //是不是很熟悉,这个就是控制接受事件所在线程的一个枚举
    final ThreadMode threadMode;
   //接收事件方法的参数类型
    final Class<?> eventType;
   //接收事件的优先级
    final int priority;
   //是否是粘性订阅
    final boolean sticky;
    /** Used for efficient comparison */
    String methodString;

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
    }
//......剩余代码不做展示
}

由于ignoreGeneratedIndex默认为false接下里我们主要分析分析findUsingInfo这个方法:

    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        1.FindState findState = prepareFindState();
        //进行初始化FindState的操作
        2.findState.initForSubscriber(subscriberClass);
        3.while (findState.clazz != null) {
            4.findState.subscriberInfo = getSubscriberInfo(findState);
            5.if (findState.subscriberInfo != null) {
                6.SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                7.for (SubscriberMethod subscriberMethod : array) {
                    8.if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        9.findState.subscriberMethods.add(subscriberMethod);
                   10. }
                11.}
           12. } else {
                13.findUsingReflectionInSingleClass(findState);
            }
           14. findState.moveToSuperclass();
       15. }
        16.return getMethodsAndRelease(findState);
    }

下面我们逐行分析
1.prepareFindState()

    private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

    private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
        return new FindState();
    }

如果缓冲池中存在FindState不为空就从缓存池里面取出一个FindState,否则创建一个FindState,取出来之后将缓存池中相应位置置空。
我们来一下这个FindState类:

    static class FindState {
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);

        Class<?> subscriberClass;
        Class<?> clazz;
        boolean skipSuperClasses;
        SubscriberInfo subscriberInfo;

        void initForSubscriber(Class<?> subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }
      //...省略代码
}

FindState是SubscriberMethodFinder的内部类,可以看出来这个类这里做一下初始化相关的工作,它相当于一个提供检查接收方法和参数得工具类。
3和14.循环获取寻找父类添加@Subscribe注解的方法加入到集合中,当然这里在
moveToSuperclass方法中进行优化掉系统的类,这样可以优化性能否则会一直找到Object类

void moveToSuperclass() {
            if (skipSuperClasses) {
                clazz = null;
            } else {
                clazz = clazz.getSuperclass();
                String clazzName = clazz.getName();
                // 跳过系统类 
                if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
                    clazz = null;
                }
            }
        }

4.如果我们订阅注册的时候传入的this不为null,我们看一下getSubscriberInfo方法

    private SubscriberInfo getSubscriberInfo(FindState findState) {
        if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
            SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
            if (findState.clazz == superclassInfo.getSubscriberClass()) {
                return superclassInfo;
            }
        }
        if (subscriberInfoIndexes != null) {
            for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                if (info != null) {
                    return info;
                }
            }
        }
        return null;
    }

从上面的分析我们可以知道findState如果在缓存中存在就取出否则创建一个新的FindState对象,在FindState.中initForSubscribe()方法中我们知道findState.subscriberInfo初始化为null,所以这里如果并且如果不存在FindState的缓存这里将这里判断findState.subscriberInfo不为空并且它的父类订阅信息也不为空的时候,如果我们传入this的class对象和FindState中的

public interface SubscriberInfo {
    Class<?> getSubscriberClass();

    SubscriberMethod[] getSubscriberMethods();

    SubscriberInfo getSuperSubscriberInfo();

    boolean shouldCheckSuperclass();
}

5-13行分析,findState中的订阅信息如果不为空获取所以订阅方法进行遍历,通过checkAdd()方法检查这个接收事件方法,然后通过anyMethodByEventType这个Map集合将接收事件方法和接收事件参数类型进行保存,key接收事件参数类型,value是接收事件方法。否则走findUsingReflectionInSingleClass()方法:

    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            int modifiers = method.getModifiers();
           //这里可以看出订阅类中的接收事件方法只能定义为public
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                //接收事件参数进行限制,只能是一个
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

可以看出来这个方法主要是通过反射将订阅类中的所有Subscribe注解的方法保存在findState类中接收事件方法集合中,其中的参数为method(订阅方法)、eventType(方法参数类型)、threadMode(订阅方法回调的线程)、subscribeAnnotation.priority()(订阅方法接收事件的优先级)、subscribeAnnotation.sticky()(订阅事件是否为粘性的)。
可以看出我们订阅方法的命名没有任何规则,只要加上@Subscribe注解的方法即可!同时事件的命名也没有任何要求!

什么是sticky event?

sticky event,中文名为粘性事件。普通事件是先注册,然后发送事件才能收到;而粘性事件,在发送事件之后再订阅该事件也能收到。此外,粘性事件会保存在内存中,每次进入都会去内存中查找获取最新的粘性事件,除非你手动解除,发送事件需要使用postSticky()方法。

获取到subscriberMethods后我们再回到注册方法中

    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

    // Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        //用于保存接收方法和注册传递进来的this对象
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            //subscriptionsByEventType这集合很重要再后续的post源码分析中还会出现
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
        //存在多个具有相同参数类型的接收方法时根据优先级判断存放在列表的位置
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

可以看出此时开始订阅了,进入subscribe()方法中进行分析
分析:

  • 这里首先创建一个Subscription对象的newSubscription将this的class对象和接收事件的方法进行保存,subscriptionsByEventType根据接收事件方法的参数类型获取一个CopyOnWriteArrayList<Subscription> subscriptions,如果没有则创建一个新的CopyOnWriteArrayList;然后将key为eventType(接收方法的参数类型)和value为subscriptions(保存所有相同参数类型的方法)保存到subscriptionsByEventType这个map中。
  • 接下来会遍历subscriptions,同时这里有一个优先级的判断,说明它是按照优先级添加的。优先级越高,会插到在当前List靠前面的位置,这里可以看出来subscriptions这个集合是为了保证接收事件的优先级顺序创建的。
  • typesBySubscriber这个Map的key为subscriber,value为subscribedEvents这个List集合,即所有的eventType列表,我查看一下这个typesBySubscriber在unregister方法中被应用
public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

从这里我们可以看出来typesBySubscriber主要是用于取消注册,首先判断接收事件方法的参数类型是否为null,不是null的话就进行取消订阅,但是我们注意到这里有一个unsubscribeByEventType方法

    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

可以看出这个方法的主要作用是用于删除订阅事件,防止取消注册后还能收到发送的事件。

  • 判断是否是sticky。如果是sticky事件的话,到最后会调用checkPostStickyEventToSubscription()方法。

到这里我们所有的注册,取消注册,和订阅方法相关的源码都分析完了,接下里我们开始分析事件的发送EventBus.getDefault().post()

    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
        //postingState.isPosting默认为false
        if (!postingState.isPosting) {
           //是否在主线程
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
               //执行完毕后重置
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
    
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

这里的currentPostingThreadState是一个ThreadLocal,保存到了当前线程中的一个变量PostingThreadState;PostingThreadState是一个用于保存数据的静态内部类。将事件传递的参数保存到eventQueue集合中,核心发送事件的方法是postSingleEvent

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

   private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
           //这个subscriptionsByEventType跟注册方法中的subscribe()方法进行对应,
           //在注册的时候已经将接收方法的参数类型和Method进行保存了,这里只需要将符合pos方法参数的方法集合取出来全部进行反射调用就行。
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

分析

  • 这里会首先取出Event的class类型,然后有一个标志位eventInheritance判断,默认为true,作用在相关代码注释有说,如果设为true的话,它会拿到Event父类的class类型,设为false,可以在一定程度上提高性能;
  • 接下来是lookupAllEventTypes()方法,就是取出Event及其父类和接口的class列表,当然重复取的话会影响性能,所以它也有做一个eventTypesCache的缓存,这样不用都重复调用getClass()方法。
  • 然后是postSingleEventForEventType()方法,这里就很清晰了,就是直接根据Event类型从subscriptionsByEventType中取出对应的subscriptions,与之前的代码对应,最后调用postToSubscription()方法。
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
    //直接通过反射调用对应已经注册类中的接收事件方法
    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

这里会根据threadMode来判断应该在哪个线程中去执行方法:

  • POSTING:默认值,发送事件在什么线程中接收事件也发生在什么线程中,事件的优先级中设置的值越大优先级越高,取消事件传递,必须是threadMode = ThreadMode.POSTING在这个情况下才能有效;
  • MAIN:接收事件发生在主线程中,在这里发送事件如果在子线程中通过Handler进行数据线程切换;如果发布事件在主线程中将阻塞发布线程,使用此模式的订阅者必须快速返回以免阻塞主线程。
  • MAIN_ORDERED:与上面逻辑类似,顺序执行我们的方法,事件将始终排队等待交付。这确保post调用是非阻塞的。
  • BACKGROUND:通过backgroundPoster.enqueue()将方法加入到后台的一个队列,最后通过线程池去执行,任务是一个接着一个的去调用;
  • ASYNC:与BACKGROUND的逻辑类似,将任务加入到后台的一个队列,最终由Eventbus中的一个线程池去调用,这里的线程池与BACKGROUND逻辑中的线程池用的是同一个,会即时异步运行。

如果文章中存在问题,请大家多多指教互相交流

相关文章

网友评论

      本文标题:EventBus 3.1.1源码分析

      本文链接:https://www.haomeiwen.com/subject/scmjbqtx.html