美文网首页
EventBus源码解析(二)

EventBus源码解析(二)

作者: HardMan | 来源:发表于2021-12-03 15:05 被阅读0次

    上篇文章分析完register方法
    现在看看post发送事件如何被消费

        public void post(Object event) {
            PostingThreadState postingState = currentPostingThreadState.get();
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);
            如果当前没有事件在消费,为false 开始消费事件
            if (!postingState.isPosting) {
                postingState.isMainThread = isMainThread();
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
     
                    while (!eventQueue.isEmpty()) {
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    
    

    继续看postSingleEvent方法,里面调用了postSingleEventForEventType方法 ,一系列调用后最终调用了postToSubscription方法。

        private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            if (eventInheritance) {
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
    
        private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    

    根据订阅方法的注解参数 如果是Posting,直接通过反射调用。如果需要在子线程中去执行,这里是通过backgroundPoster去执行。往下看 BackgroundPoster 的enqueue方法

        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!executorRunning) {
                    executorRunning = true;
                    eventBus.getExecutorService().execute(this);
                }
            }
        }
    

    这个excutorService 是什么呢 找到EventBusBuilder

    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    结果很明显,通过线程池在子线程中去处理。

    如果要在主线程去执行呢 是调用了mainThreadPoster。它的本质就是handler。

    总结:post函数是将事件添加到对列当中,根据事件类型,从前面register方法中注册的map集合拿到以enventtype为key的subscribetion。Subscription的属性中包括了订阅者对象和所注册的方法。最后通过反射进行调用。在调用前,通过handelr和线程池切换线程。

    相关文章

      网友评论

          本文标题:EventBus源码解析(二)

          本文链接:https://www.haomeiwen.com/subject/rlgvxrtx.html