美文网首页
google guava EventBus 没有按预期的并行执行

google guava EventBus 没有按预期的并行执行

作者: 天草二十六_简村人 | 来源:发表于2019-07-25 10:24 被阅读0次

    一、背景
    上一篇https://www.jianshu.com/writer#/notebooks/9031136/notes/51194824中讲解了一般情况下怎么使用guava EventBus,具体使用的是EventBus eventBus = new EventBus();
    有实际对接过的同学会发现,它并没有按照预期的并行执行事件,而是串行等待事件执行完成。这是什么情况?

    二、源码分析
    1、EventBus类的属性和构造方法

    //名称
    private final String identifier;
    //执行器
        private final Executor executor;
        private final SubscriberExceptionHandler exceptionHandler;
        private final SubscriberRegistry subscribers;
    //事件分发器,有三类,默认选择的是Dispatcher.perThreadDispatchQueue()
        private final Dispatcher dispatcher;
    
     public EventBus() {
            this("default");
        }
    
        public EventBus(String identifier) {
            this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
        }
    
        public EventBus(SubscriberExceptionHandler exceptionHandler) {
            this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
        }
    EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
            this.subscribers = new SubscriberRegistry(this);
            this.identifier = (String)Preconditions.checkNotNull(identifier);
            this.executor = (Executor)Preconditions.checkNotNull(executor);
            this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
            this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
        }
    

    可以看出EventBus使用的事件分发器是Dispatcher.perThreadDispatchQueue(),重点executor执行器是MoreExecutors.directExecutor()。下面我们看下guava的包com.google.common.util.concurrent下的类MoreExecutors的该方法源码。

     public static Executor directExecutor() {
            return MoreExecutors.DirectExecutor.INSTANCE;
        }
    // guava实现了java.util.concurrent.Executor接口,重点看execute()方法
    // 利用枚举达到天然的单例
     private static enum DirectExecutor implements Executor {
            INSTANCE;
    
            private DirectExecutor() {
            }
    
    //直接运行,并没有新开线程
            public void execute(Runnable command) {
                command.run();
            }
    
            public String toString() {
                return "MoreExecutors.directExecutor()";
            }
        }
    

    2、Dispatcher事件分发器的三种实现

    //
    package com.google.common.eventbus;
    
    import com.google.common.base.Preconditions;
    import com.google.common.collect.Queues;
    import java.util.Iterator;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    abstract class Dispatcher {
        Dispatcher() {
        }
    //静态方法实现单例
        static Dispatcher perThreadDispatchQueue() {
            return new Dispatcher.PerThreadQueuedDispatcher();
        }
    
        static Dispatcher legacyAsync() {
            return new Dispatcher.LegacyAsyncDispatcher();
        }
    
        static Dispatcher immediate() {
            return Dispatcher.ImmediateDispatcher.INSTANCE;
        }
    
    //抽象方法,待具体分发器自己实现
        abstract void dispatch(Object var1, Iterator<Subscriber> var2);
    
    //同步模式
        private static final class ImmediateDispatcher extends Dispatcher {
            private static final Dispatcher.ImmediateDispatcher INSTANCE = new Dispatcher.ImmediateDispatcher();
    
            private ImmediateDispatcher() {
            }
    
            void dispatch(Object event, Iterator<Subscriber> subscribers) {
                Preconditions.checkNotNull(event);
    
                while(subscribers.hasNext()) {
                    ((Subscriber)subscribers.next()).dispatchEvent(event);
                }
    
            }
        }
    //异步模式
        private static final class LegacyAsyncDispatcher extends Dispatcher {
            private final ConcurrentLinkedQueue<Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber> queue;
    
            private LegacyAsyncDispatcher() {
                this.queue = Queues.newConcurrentLinkedQueue();
            }
    
            void dispatch(Object event, Iterator<Subscriber> subscribers) {
                Preconditions.checkNotNull(event);
    
                while(subscribers.hasNext()) {
                    this.queue.add(new Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber(event, (Subscriber)subscribers.next()));
                }
    
                Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber e;
                while((e = (Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber)this.queue.poll()) != null) {
                    e.subscriber.dispatchEvent(e.event);
                }
    
            }
    
            private static final class EventWithSubscriber {
                private final Object event;
                private final Subscriber subscriber;
    
                private EventWithSubscriber(Object event, Subscriber subscriber) {
                    this.event = event;
                    this.subscriber = subscriber;
                }
            }
        }
    //异步模式,EventBus默认选择的事件分发器
        private static final class PerThreadQueuedDispatcher extends Dispatcher {
            private final ThreadLocal<Queue<Dispatcher.PerThreadQueuedDispatcher.Event>> queue;
            private final ThreadLocal<Boolean> dispatching;
    
            private PerThreadQueuedDispatcher() {
                this.queue = new ThreadLocal<Queue<Dispatcher.PerThreadQueuedDispatcher.Event>>() {
                    protected Queue<Dispatcher.PerThreadQueuedDispatcher.Event> initialValue() {
                        return Queues.newArrayDeque();
                    }
                };
                this.dispatching = new ThreadLocal<Boolean>() {
                    protected Boolean initialValue() {
                        return false;
                    }
                };
            }
    
            void dispatch(Object event, Iterator<Subscriber> subscribers) {
                Preconditions.checkNotNull(event);
                Preconditions.checkNotNull(subscribers);
                Queue<Dispatcher.PerThreadQueuedDispatcher.Event> queueForThread = (Queue)this.queue.get();
                queueForThread.offer(new Dispatcher.PerThreadQueuedDispatcher.Event(event, subscribers));
                if (!((Boolean)this.dispatching.get()).booleanValue()) {
                    this.dispatching.set(true);
    
                    Dispatcher.PerThreadQueuedDispatcher.Event nextEvent;
                    try {
                        while((nextEvent = (Dispatcher.PerThreadQueuedDispatcher.Event)queueForThread.poll()) != null) {
                            while(nextEvent.subscribers.hasNext()) {
                                ((Subscriber)nextEvent.subscribers.next()).dispatchEvent(nextEvent.event);
                            }
                        }
                    } finally {
                        this.dispatching.remove();
                        this.queue.remove();
                    }
                }
    
            }
    
            private static final class Event {
                private final Object event;
                private final Iterator<Subscriber> subscribers;
    
                private Event(Object event, Iterator<Subscriber> subscribers) {
                    this.event = event;
                    this.subscribers = subscribers;
                }
            }
        }
    }
    
    

    3、通过上面可以看出,默认的EventBus是同步执行的,根本原因是Executor选择的是MoreExecutors的内部枚举类DirectExecutor。
    我们选择使用AsyncEventBus异步事件总线。

    package com.google.common.eventbus;
    
    import com.google.common.annotations.Beta;
    import com.google.common.eventbus.EventBus.LoggingHandler;
    import java.util.concurrent.Executor;
    
    @Beta
    public class AsyncEventBus extends EventBus {
        public AsyncEventBus(String identifier, Executor executor) {
            super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
        }
    
        public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
            super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
        }
    
        public AsyncEventBus(Executor executor) {
            super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
        }
    }
    

    三、解决办法

    如果我们想要达到并行执行,在使用的时候定义如下:

    EventBus eventBus = new AsyncEventBus(new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    

    这也验证了https://www.jianshu.com/writer#/notebooks/9031136/notes/36887503 文中开头所写。

    相关文章

      网友评论

          本文标题:google guava EventBus 没有按预期的并行执行

          本文链接:https://www.haomeiwen.com/subject/yrdcrctx.html