EventBus

作者: gczxbb | 来源:发表于2018-03-09 14:47 被阅读0次

    EventBus是典型的监听者设计,由EventBus负责管理所有的订阅者,当post事件对象时,判断绑定事件的的订阅者类型并通知。它包括两个核心内容,一是注册订阅者,二是根据post事件匹配订阅者。

    注册即往以下两个Map存储订阅者和事件的关联数据。

    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    

    关联事件与订阅者的Map,Key事件Class,Value是Subscription类型的List,每个事件Class对应一个Subscription列表,post在该Map中查找订阅者。

    private final Map<Object, List<Class<?>>> typesBySubscriber;
    

    关联订阅者与事件类型的Map,Key订阅者对象,Value是该订阅者类中涉及的事件类列表。

    注册

    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.
                        findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    

    1:根据订阅者的Class,查找SubscriberMethod列表,SubscriberMethod代表一个订阅者方法封装

    private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE =
                         new ConcurrentHashMap<>();
    

    METHOD_CACHE缓存Map保存了订阅者类与SubscriberMethod列表的关联。如果已经注册过订阅者对象,则subscriberClass对应的List是存在的。
    如果不存在,则findSubscriberMethods会调用findUsingInfo方法获取List<SubscriberMethod>,返回在METHOD_CACHE建立subscriberClass与List<SubscriberMethod>的关联。

    findUsingInfo方法
    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        FindState findState = prepareFindState();//获取FindState
        findState.initForSubscriber(subscriberClass);//初始化findState
        while (findState.clazz != null) {
                findState.subscriberInfo = getSubscriberInfo(findState);
                if (findState.subscriberInfo != null) {
                    ...
                } else {
                    findUsingReflectionInSingleClass(findState);
                }
                findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }
    

    先准备一个FindState,用于存储订阅者Class信息,通过findUsingReflectionInSingleClass方法遍历订阅者Class的所有Method,获取注解@Subscribe的方法,和线程模式threadMode等信息,为该Method创建一个SubscriberMethod对象保存在findState.subscriberMethods,最后getMethodsAndRelease将List<SubscriberMethod>返回。
    2:遍历List<SubscriberMethod>,调用subscribe方法。

    subscribe(Object subscriber, SubscriberMethod subscriberMethod) 方法:
    
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType
                    .get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
                    throw new EventBusException("Subscriber " + subscriber.getClass() +
                                 " already registered to event " + eventType);
        }
    }
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).
                  subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);
    

    SubscriberMethod中存储了事件类Class,根据Class<?> eventType事件类,在subscriptionsByEventType查找订阅者Subscription列表,不存在则创建。
    创建Subscription,Subscription封装了订阅者对象Object与SubscriberMethod,按照优先级顺序将Subscription加入列表,priority可在@Subscribe设置。
    根据订阅者在typesBySubscriber中查找事件类列表,将事件Class加入。
    如果是黏性事件,则注册后从stickyEvents列表中取出事件,postToSubscription发送。
    注册完毕。

    post

    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
    
        if (!postingState.isPosting) {
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
    

    首先获取当前线程下的PostingThreadState对象,isPosting标志位代表是否正在执行post方法,默认是false,设置为true,如果此线程正在isPosting,直接退出了。
    在while循环中,将eventQueue的每个Object传给postSingleEvent方法,直至为空。

    private void postSingleEvent(Object event, PostingThreadState postingState) 
                    throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
             }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    Log.d(TAG, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
        }
    }
    

    postSingleEventForEventType根据事件Class匹配订阅者

    private boolean postSingleEventForEventType(Object event, PostingThreadState 
                    postingState,Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                 } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }
    

    根据eventClass类型在Map:subscriptionsByEventType中查找Subscription列表,这些Subscription就是绑定了eventClass类的订阅者及对应方法的封装。接下来就是遍历所有的Subscription去通知。

    private void postToSubscription(Subscription subscription, Object event, 
                    boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {//主线程Handler发送消息,到主线程执行
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + ...);
        }
    }
    

    有四种线程模式,在注解Subscribe中设置
    POSTING在当前线程中通知订阅者,直接触发

    void invokeSubscriber(Subscription subscription, Object event) {
        try { //触发订阅者方法
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {   
        } catch (IllegalAccessException e) {       
        }
    }
    

    MAIN在主线程中通知订阅者
    如果当前线程不是主线程,通过主线程HandlerPoster发送消息,HandlerPoster继承Handler。将Subscription和Event事件封装成一个待发送的PendingPost对象,从pendingPostPool池获取PendingPost,插入HandlerPoster内部PendingPostQueue中。enqueue方法多线程调用,同步。

    void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {//多线程同步
                queue.enqueue(pendingPost);//插入队列
                if (!handlerActive) {
                    handlerActive = true;
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
    }
    

    handleMessage在主线程执行,handlerActive说明主线程正在遍历处理PendingPostQueue队列,且PendingPostQueue不是空,此时如果有线程enquque方法,不需要sendMessage,只需要将pendingPost插入即可。在主线程发现队列已经为空,handleMessage退出并设置handleActive为false,这时线程的enqueue才会触发sendMessage唤起主线程handleMessage循环。

    public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                    PendingPost pendingPost = queue.poll();//队列获取对象
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);//invoke触发
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    if (timeInMethod >= maxMillisInsideHandleMessage) {//重新发送新通知
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                handlerActive = rescheduled;
            }
    }
    

    设置maxMillisInsideHandleMessage统计当前已在执行handleMessage的时间,当大于一定值时防止主线程一直停留在执行订阅者通知上而忽略了UI的更新导致ANR,重新sendMessage并退出,让主线程有空隙处理UI事务。
    BACKGROUND后台线程通知订阅者
    如果当前是主线程,则通过BackgroundPoster任务交给后台线程处理,BackgroundPoster是Runnable类型

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }
    

    EventBus的线程池ExecutorService处理BackgroundPoster任务,当主线程调用enquque,线程池已经开启执行任务,再次enquque则不会再利用线程池开启任务,因executorRunning 标志已设置为true。此时只需要将pendPost放入队列中即可。后台线程run会一直从PendingPostQueue队列中取PendingPost 对象直到队列为空,设置executorRunning 为false,退出,后续就能在主线程中触发线程池执行任务啦。每隔1000才从队列中取元素。

    @Override
    public void run() {
        ...
        while (true) {
            PendingPost pendingPost = queue.poll(1000);
            if (pendingPost == null) {
                synchronized (this) {
                    // Check again, this time in synchronized
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        executorRunning = false;
                        return;
                    }
                }
            }
            eventBus.invokeSubscriber(pendingPost);//后台线程invoke触发
        }
        ...
    }
    

    ASYNC通知订阅者,异步
    不论事件发送者是什么线程,均交给后台线程池处理任务通知订阅者,线程池为每个插入事件均执行execute一次任务
    BACKGROUND的区别
    BACKGROUND只有在主线程发送者的条件下才启用线程池处理任务,任务中循环poll队列的元素,如果元素存在则一直执行,即使又有主线程post,则仅加入队列,且重用该线程,不需要线程池重新执行execute。
    ASYNC只要调用一次enquque,就由线程池执行execute一次AsyncPoster任务,且run方法仅poll一次取出执行。

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }
    
    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
    

    以上线程池采用可缓存线程池

    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = 
                    Executors.newCachedThreadPool();
    

    END
    ^^

    相关文章

      网友评论

        本文标题:EventBus

        本文链接:https://www.haomeiwen.com/subject/rtnrfftx.html