美文网首页
Android EventBus 源码浅析

Android EventBus 源码浅析

作者: 过期的薯条 | 来源:发表于2021-05-16 11:43 被阅读0次

1.前言

eventbus库小巧灵活好用,是面试的考点。学习完eventbus之后发现其和handler机制真的是太过于相似了。果然是一通百通,触类旁通啊。

2.正题

学习目标:

  • 如何进行线程的切换

  • EventBus流程

Eventbus.java

构造方法:双重锁单例

/** Convenience singleton for apps using a process-wide EventBus instance. */
    public static EventBus getDefault() {
        EventBus instance = defaultInstance;
        if (instance == null) {
            synchronized (EventBus.class) {
                instance = EventBus.defaultInstance;
                if (instance == null) {
                    instance = EventBus.defaultInstance = new EventBus();
                }
            }
        }
        return instance;
    }

注册过程

 /**
     * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
     * are no longer interested in receiving events.
     * <p/>
     * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
     * The {@link Subscribe} annotation also allows configuration like {@link
     * ThreadMode} and priority.
     */
    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            Log.d("wangxuyang","subscriberMethods size : "+subscriberMethods.size());
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);

            }
        }
    }

SubscriberMethod是对我们在Class中添加@Subscribe(threadMode = ThreadMode.MAIN)的描述

public class SubscriberMethod {
    final Method method;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;
    /** Used for efficient comparison */
    String methodString;
}

SubscriberMethindFinder.java 寻找Class中的注解方法。有俩种方式:

  • 一种是通过反射查找

  • 另外一种是通过注解生成的类

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
                //ignoreGeneratedIndex 默认为false
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);//通过反射查找
        } else {
            //通过接口 SubscriberInfoIndex 查找。SubscriberInfoIndex 由EventBus编译生成的代码
            subscriberMethods = findUsingInfo(subscriberClass);//编译时生成代码
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

subscribe(subscriber, subscriberMethod) 方法的作用:

  private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);

        Log.d("wangxuyang","class: "+subscriber.getClass()+"   "+eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        Log.d("wangxuyang","size :"+size);
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

Subscription 对象作用:将subscriber 和SubscriberMethod 绑定到一起的封装。

private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

将接受相同的Event的,Subscription 保存到一起,用CopyOnWriteArrayList 保存。

private final Map<Object, List<Class<?>>> typesBySubscriber;

一个订阅者,可能接受多个不同个Event。用 typesBySubscriber 保存

private final Map<Class<?>, Object> stickyEvents;

用来保存粘性事件的map


发送过程(post过程)

/** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

CurrentPostingThreadState 当前线程的副本:

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

PostingThreadState: 当前线程下的副本变量。保存着发送的Event对象。

final static class PostingThreadState {
    final List<Object> eventQueue = new ArrayList<>();
    boolean isPosting;
    boolean isMainThread;
    Subscription subscription;
    Object event;
    boolean canceled;
}

postSingleEvent方法发送事件:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
eventTypes中包含了当前的Class,以及父Class,以及父Class的父Class。

postSingleEventForEventType()方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

根据eventClass 获取到所有定于了此event的Subscriptions 。通过for循环,执行 postToSubscription()。

postToSubScription() 才是真正核心的方法:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);//假如是主线程执行进队列
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

这里有必要解释下:

  • POSTING

    POSTING 的订阅者会在发布的同一个线程调用,发布者在主线程那么订阅者也就在主线程,反之亦,避免了线程切换

  • MAIN

    表示事件处理函数的线程在主线程(UI)线程,因此在这里不能进行耗时操作

  • BACKGROUND

    表示事件处理函数的线程在后台线程,因此不能进行UI操作。如果发布事件的线程是主线程(UI线程),那么事件处理函数将会开启一个后台线程,如果发布事件的线程是在后台线程,那么事件处理函数就使用该线程

  • ASYNC

    表示无论事件发布的线程是哪一个,事件处理函数始终会新建一个子线程运行,同样不能进行UI操作。

MainThreadPoster,BackgroundPoster,AsyncPoster

  • MainThreadPoster

本质是一个Handler,将主线程的Looper传入进来了。实现了Poster接口。实现代码如下:

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

maxMillisInsideHandleMessage:

它用于限制一次事件发布所能消耗的最多的主线程时间。如果事件限制到了的时候订阅者没有通知完,则会发送一个消息,在下一轮中继续处理。

接着我们观看HandlerPoster的enqueue操作:

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);//进完队列之后,发送一个Message,进入到handleMessage方法中,
        if (!handlerActive) {
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // Check again, this time in synchronized
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            eventBus.invokeSubscriber(pendingPost);
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

主线程发送,主线程处理的流程总结:

主线程post一个event之后。MainThreadPoster会对此event执行进队操作,之后发送一个空Message,进入handleMessage中,执行while(true)循环, 队列弹出一个PendingPost。执行通过反射调用对应的方法。然后根据maxMillisInsideHandleMessage 结束掉while(true)循环 避免占用太多的主线程资源。


  • AsyncPoster

AsyncPoster实现了Poster,Runnable接口。它的enqueue 方法就是将PendingPost入队,之后再run中出队,并反射调用。保证

eventBus.invokeSubscriber 在子线程中执行。

class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

线程切换总结:

eventbus的线程切换,并不是像我们生活中切换的概念。假设有A,B赛道,A切换到B。那么A赛道依旧存在,我们还可以通过B切换到A。这是我们通俗的理解。

而EventBus的线程切换是。在主线程中执行invokeSubscriber方法,以及通过线程池启动一个线程执行invokeSubscriber方法。这就导致从Main切换到Thread1之后。很有可能下次再切换的时候,发现是Main切换到Thread2

post流程图:


未命名文件.jpg

相关文章

网友评论

      本文标题:Android EventBus 源码浅析

      本文链接:https://www.haomeiwen.com/subject/alnljltx.html