EventBus源码解析

作者: MingMing很忙 | 来源:发表于2020-05-22 16:19 被阅读0次

1-注册

1.1-核心

注册主要原理就是EventBus的三个map变量的注册:

  • subscriptionsByEventType | EventType为key,Subscription列表为Value
  • typesBySubscriber | 注册对象subscriber为key,EventType列表为Value
  • stickyEvents | EventType为key,Event对象为Value

字段说明:

字段 说明
EventType 事件类型,即@Subscribe注解的订阅方法的第一个参数对应的Class对象
Subscription 封装了订阅对象subscriber及订阅方法SubscriberMethod
SubscriberMethod 封装了订阅对象的订阅方法的相关信息,即@Subscribe注解参数及注解的方法信息
subscriber 注册对象,例如在MainActivty注册即为MainActivity的实例对象
Event 事件对象,即@Subscribe注解的订阅方法的第一个入参实参对象

1.2-注册具体源码

EventBus.getDefault().register(this);开始,this传入的是当前注册类的Activity对象

/**
 * EventBus.register方法
 */
public void register(Object subscriber) {
        //@1.获取对象类对应的Class对象
        Class<?> subscriberClass = subscriber.getClass();
        //@2.通过反射获取Class对象中注解的方法列表
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            //@5.遍历获取的目标方法列表,并注册
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

@2.findSubscriberMethods方法获取Class对象中@Subscribe注解的方法列表。subscriberMethodFinder是EventBus的成员变量具体实现

//SubscriberMethodFinder.findSubscriberMethods方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //先读缓存
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        //ignoreGeneratedIndex默认为false,若自定义EventBusBuilder则为ture
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //@3.通过反射获取Class文件的带注解方法
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            //注册类没有@Subscriber注解,抛出异常
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //更新缓存
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

@3.通过反射获取Class文件的带注解方法。

//SubscriberMethodFinder.findUsingInfo方法
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        //通过FindState来辅助查找
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            //第一次注册为null
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                //@4.筛选目标方法,并将目标方法相关信息封装到findState中的集合
                findUsingReflectionInSingleClass(findState);
            }
            //将findState.clazz修改为父类,到父类遍历
            findState.moveToSuperclass();
        }
        //返回findState中的SubscriberMethod列表,并释放findState
        return getMethodsAndRelease(findState);
    }

@4.findUsingReflectionInSingleClass方法

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // 通过反射获取Class对象的所有方法
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            。。。
        }
        //遍历所有方法,根据条件筛选出目标方法
        for (Method method : methods) {
            //获取方法修饰符
            int modifiers = method.getModifiers();
            //方法是public且非abstract、static等
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                //获取方法的形参类型
                Class<?>[] parameterTypes = method.getParameterTypes();
                //只有一次参数
                if (parameterTypes.length == 1) {
                    //获取Subscribe注解类
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        //获取订阅事件的类型
                        Class<?> eventType = parameterTypes[0];
                        //判断findState是否添加过该事件类型为key的键值对,未添加过返回true
                        if (findState.checkAdd(method, eventType)) {
                            //获取注解中指定的线程
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            //将订阅方法、事件类型、线程、优先级、是否支持粘性事件等封装成SubscriberMethod
                            //并添加到findState的subscriberMethods集合中
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    。。。
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                。。。
            }
        }
    }

@5.遍历获取的目标方法列表,并注册。主要是注册三个map。

//调用EventBus.subscribe方法注册
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        //获取订阅方法的事件类型
        Class<?> eventType = subscriberMethod.eventType;
        //将方法参数SubscriberMethod及注册类Object封装为Subscription
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //查找注册类是否有对应的Subscription列表
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            //没有则创建集合 subscriptions
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        //根据newSubscription的优先级插入到subscriptions集合中
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        //判断typesBySubscriber map中是否存在当前注册类的事件类型列表
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        //添加事件类型到列表
        subscribedEvents.add(eventType);
        //如果支持粘性事件
        //1.4-粘性事件
        if (subscriberMethod.sticky) {
            。。。
        }
    }

总结注册流程:

  • 1-根据反射获取注册类Class对象的所有方法
  • 2-遍历方法列表根据条件筛选目标方法(public修饰一个形参,Subscribe注解)
  • 3-将目标方法信息及指定线程、优先级、粘性等封装到SubscriberMethod。最终返回筛选出来的SubscriberMethod列表
  • 4-遍历SubscriberMethod列表,并分别注册到EvnetBus的三个map集合中


    图1-注册流程图

2-取消注册

EventBus.getDefault().unregister(this);

//EventBus.unregister方法
public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            //遍历注册对象-eventType列表的map
            for (Class<?> eventType : subscribedTypes) {
                //@1
                unsubscribeByEventType(subscriber, eventType);
            }
            //将该对象从map中移除
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

@1.调用unsbscribeByEventType将eventType-事件列表的map中移除目标对象

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        //通过EventType获取对应的事件列表
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            //依次删除事件列表中与目标对象一致的subscription
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

总结解注册流程:

  • 从object-eventType列表的map中遍历eventType
  • 获取eventType对应的事件列表subscriptions,依次移除订阅对象关联的事件
  • 将订阅对象从object-eventType的map中移除

3-事件发送

EventBus.getDefault().post(new Object());
调用EventBus.post方法。在EventBus中维护一个PostingThreadState。各自线程维护一份,保存了当前线程的事件队列,线程状态,发送状态,是否主线程等信息。

  • ThreadLocal<PostingThreadState> currentPostingThreadState
final static class PostingThreadState {
        //当前线程的事件队列
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;//是否发送状态
        boolean isMainThread;//是否主线程
        Subscription subscription;//辅助记录待通知的订阅者
        Object event;//辅助记录待发送的事件
        boolean canceled;
    }
public void post(Object event) {
        //获取当前线程的PostingThreadState
        PostingThreadState postingState = currentPostingThreadState.get();
        //获取待发送事件队列
        List<Object> eventQueue = postingState.eventQueue;
        //将当前发送事件入列
        eventQueue.add(event);
        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //遍历待发送事件列表
                while (!eventQueue.isEmpty()) {
                    //@1发送单个事件
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                //重置postingState状态
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

@1.逐一发送事件

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            //拿到该事件所有的父类事件类型
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            //遍历事件类型
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                //@2查找订阅者
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        //没有订阅
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            //发送NosubscriberEvent事件
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

@2.通过eventType-订阅者列表找到对应的订阅者列表,依次发送事件到对应的订阅者

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //获取eventType-订阅者的map对应的订阅列表
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {   
            //遍历订阅者Subscription
            for (Subscription subscription : subscriptions) {
                //记录订阅者及事件
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted;
                try {
                    //@3发送事件到订阅者
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    //清除PostingState状态
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

@3.发送事件到对应的订阅者
。该模式下涉及到的三个列表:

  • HandlerPoster mainThreadPoster;//切换到主线程执行
  • BackgroundPoster backgroundPoster;//切换到线程池执行,有同步锁
  • AsyncPoster asyncPoster;//切换到线程池执行,无同步锁
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        //根据订阅方法设置的线程模式执行
        switch (subscription.subscriberMethod.threadMode) {
            //默认线程模式,在发送事件的线程接收事件
            case POSTING:
                //@4.通过反射执行订阅者的订阅方法
                invokeSubscriber(subscription, event);
                break;
            //指定主线程
            case MAIN:
                if (isMainThread) {//如果当前是主线程则直接执行
                    invokeSubscriber(subscription, event);
                } else { //@5.否则子线程加入队列,通过Handler切换到主线程执行
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            //无论哪个线程都加入队列,通过handler在主线程执行
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            //后台执行
            case BACKGROUND:
                if (isMainThread) {//@7.主线程,加入队列,通过线程池执行
                    backgroundPoster.enqueue(subscription, event);
                } else {//子线程,直接执行
                    invokeSubscriber(subscription, event);
                }
                break;
            //无论哪个线程都加入队列,线程池执行
            case ASYNC:
                //@8.AsyncPoster与BackgroundPoster不同的是
                //没有加同步锁
                //每次执行都会开一个子线程执行
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

@4.通过反射执行订阅者类的订阅方法

void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

@5.子线程加入队列,通过Handler切换到主线程执行

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //将subscription和event封装成PendingPost对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //加入到队列
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                //主线程Handler发送消息
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
    //主线程Handler接收并处理消息
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                //取出队列
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //@6.执行事件
                eventBus.invokeSubscriber(pendingPost);
                。。。
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

@6.取出PendingPost并通过反射执行订阅方法

void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        //是否PendingPost的引用资源
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            //通过反射调用对应的方法,此时在主线程执行
            invokeSubscriber(subscription, event);
        }
    }

@7.BACKGROUND线程模式下,主线程加入队列,通过线程池切换到子线程执行。BackgroundPoster原理和HandlerPoster类似。

final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //用PendingPost封装subscription及event
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //加入队列
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                //调用newCachedThreadPool线程池,执行任务
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //执行订阅事件
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
}

@8.ASYNC线程模式下,所有线程进入队列,通过线程池切换到子线程执行。具体由AsyncPoster实现,BackgroundPoster是通过CachedThreadPool来管理线程,而AsyncPoster则是有EventBusBuilder传入的线程池来管理线程。且BackgroundPoster执行时加了synchronized同步锁,而AsyncPoster未加锁。

class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        //通过线程池切换到子线程执行
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

事件发送总结:

  • 1.将发送的事件保存在postingState中
  • 2.遍历postingState的事件列表eventQueue
  • 3.遍历eventQueue根据线程模式直接或间接通过反射执行订阅方法


    事件发送流程图

4-粘性事件

粘性事件使用:

//@1.订阅处理粘性事件
@Subscribe(sticky = true)
public void testEventBust(Object obj) {
    。。。
}
//@2.发送粘性事件
EventBus.getDefault().postSticky(new Object());

@1.订阅处理粘性事件,当sticky=true时,在EventBus.subscribe注册方法中

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        //见1-@5
        。。。
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                //遍历stickyEvents集合
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                       //如果方法制成粘性事件,则根据线程模式处理事件 checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

@2.发布粘性事件。

public void postSticky(Object event) {
    // 将要发布的粘性事件存stickyEvents中
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // 普通的发布事件
    post(event);
}

粘性事件总结:

  • 通过postSticky将粘性事件存储到stickyEvents中
  • 调用post发送粘性事件,若订阅者活跃则会消耗该事件并从stickyEvents中移除
  • 订阅者注册时,遍历stickyEvents集合,通过postToSubscription发送Event给当前订阅者并执行

相关文章

网友评论

    本文标题:EventBus源码解析

    本文链接:https://www.haomeiwen.com/subject/pbaqahtx.html