美文网首页
google guava EventBus 没有按预期的并行执行

google guava EventBus 没有按预期的并行执行

作者: 天草二十六_简村人 | 来源:发表于2019-07-25 10:24 被阅读0次

一、背景
上一篇https://www.jianshu.com/writer#/notebooks/9031136/notes/51194824中讲解了一般情况下怎么使用guava EventBus,具体使用的是EventBus eventBus = new EventBus();
有实际对接过的同学会发现,它并没有按照预期的并行执行事件,而是串行等待事件执行完成。这是什么情况?

二、源码分析
1、EventBus类的属性和构造方法

//名称
private final String identifier;
//执行器
    private final Executor executor;
    private final SubscriberExceptionHandler exceptionHandler;
    private final SubscriberRegistry subscribers;
//事件分发器,有三类,默认选择的是Dispatcher.perThreadDispatchQueue()
    private final Dispatcher dispatcher;
 public EventBus() {
        this("default");
    }

    public EventBus(String identifier) {
        this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
    }

    public EventBus(SubscriberExceptionHandler exceptionHandler) {
        this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
    }
EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
        this.subscribers = new SubscriberRegistry(this);
        this.identifier = (String)Preconditions.checkNotNull(identifier);
        this.executor = (Executor)Preconditions.checkNotNull(executor);
        this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
        this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
    }

可以看出EventBus使用的事件分发器是Dispatcher.perThreadDispatchQueue(),重点executor执行器是MoreExecutors.directExecutor()。下面我们看下guava的包com.google.common.util.concurrent下的类MoreExecutors的该方法源码。

 public static Executor directExecutor() {
        return MoreExecutors.DirectExecutor.INSTANCE;
    }
// guava实现了java.util.concurrent.Executor接口,重点看execute()方法
// 利用枚举达到天然的单例
 private static enum DirectExecutor implements Executor {
        INSTANCE;

        private DirectExecutor() {
        }

//直接运行,并没有新开线程
        public void execute(Runnable command) {
            command.run();
        }

        public String toString() {
            return "MoreExecutors.directExecutor()";
        }
    }

2、Dispatcher事件分发器的三种实现

//
package com.google.common.eventbus;

import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

abstract class Dispatcher {
    Dispatcher() {
    }
//静态方法实现单例
    static Dispatcher perThreadDispatchQueue() {
        return new Dispatcher.PerThreadQueuedDispatcher();
    }

    static Dispatcher legacyAsync() {
        return new Dispatcher.LegacyAsyncDispatcher();
    }

    static Dispatcher immediate() {
        return Dispatcher.ImmediateDispatcher.INSTANCE;
    }

//抽象方法,待具体分发器自己实现
    abstract void dispatch(Object var1, Iterator<Subscriber> var2);

//同步模式
    private static final class ImmediateDispatcher extends Dispatcher {
        private static final Dispatcher.ImmediateDispatcher INSTANCE = new Dispatcher.ImmediateDispatcher();

        private ImmediateDispatcher() {
        }

        void dispatch(Object event, Iterator<Subscriber> subscribers) {
            Preconditions.checkNotNull(event);

            while(subscribers.hasNext()) {
                ((Subscriber)subscribers.next()).dispatchEvent(event);
            }

        }
    }
//异步模式
    private static final class LegacyAsyncDispatcher extends Dispatcher {
        private final ConcurrentLinkedQueue<Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber> queue;

        private LegacyAsyncDispatcher() {
            this.queue = Queues.newConcurrentLinkedQueue();
        }

        void dispatch(Object event, Iterator<Subscriber> subscribers) {
            Preconditions.checkNotNull(event);

            while(subscribers.hasNext()) {
                this.queue.add(new Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber(event, (Subscriber)subscribers.next()));
            }

            Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber e;
            while((e = (Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber)this.queue.poll()) != null) {
                e.subscriber.dispatchEvent(e.event);
            }

        }

        private static final class EventWithSubscriber {
            private final Object event;
            private final Subscriber subscriber;

            private EventWithSubscriber(Object event, Subscriber subscriber) {
                this.event = event;
                this.subscriber = subscriber;
            }
        }
    }
//异步模式,EventBus默认选择的事件分发器
    private static final class PerThreadQueuedDispatcher extends Dispatcher {
        private final ThreadLocal<Queue<Dispatcher.PerThreadQueuedDispatcher.Event>> queue;
        private final ThreadLocal<Boolean> dispatching;

        private PerThreadQueuedDispatcher() {
            this.queue = new ThreadLocal<Queue<Dispatcher.PerThreadQueuedDispatcher.Event>>() {
                protected Queue<Dispatcher.PerThreadQueuedDispatcher.Event> initialValue() {
                    return Queues.newArrayDeque();
                }
            };
            this.dispatching = new ThreadLocal<Boolean>() {
                protected Boolean initialValue() {
                    return false;
                }
            };
        }

        void dispatch(Object event, Iterator<Subscriber> subscribers) {
            Preconditions.checkNotNull(event);
            Preconditions.checkNotNull(subscribers);
            Queue<Dispatcher.PerThreadQueuedDispatcher.Event> queueForThread = (Queue)this.queue.get();
            queueForThread.offer(new Dispatcher.PerThreadQueuedDispatcher.Event(event, subscribers));
            if (!((Boolean)this.dispatching.get()).booleanValue()) {
                this.dispatching.set(true);

                Dispatcher.PerThreadQueuedDispatcher.Event nextEvent;
                try {
                    while((nextEvent = (Dispatcher.PerThreadQueuedDispatcher.Event)queueForThread.poll()) != null) {
                        while(nextEvent.subscribers.hasNext()) {
                            ((Subscriber)nextEvent.subscribers.next()).dispatchEvent(nextEvent.event);
                        }
                    }
                } finally {
                    this.dispatching.remove();
                    this.queue.remove();
                }
            }

        }

        private static final class Event {
            private final Object event;
            private final Iterator<Subscriber> subscribers;

            private Event(Object event, Iterator<Subscriber> subscribers) {
                this.event = event;
                this.subscribers = subscribers;
            }
        }
    }
}

3、通过上面可以看出,默认的EventBus是同步执行的,根本原因是Executor选择的是MoreExecutors的内部枚举类DirectExecutor。
我们选择使用AsyncEventBus异步事件总线。

package com.google.common.eventbus;

import com.google.common.annotations.Beta;
import com.google.common.eventbus.EventBus.LoggingHandler;
import java.util.concurrent.Executor;

@Beta
public class AsyncEventBus extends EventBus {
    public AsyncEventBus(String identifier, Executor executor) {
        super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
    }

    public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
        super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
    }

    public AsyncEventBus(Executor executor) {
        super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
    }
}

三、解决办法

如果我们想要达到并行执行,在使用的时候定义如下:

EventBus eventBus = new AsyncEventBus(new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));

这也验证了https://www.jianshu.com/writer#/notebooks/9031136/notes/36887503 文中开头所写。

相关文章

网友评论

      本文标题:google guava EventBus 没有按预期的并行执行

      本文链接:https://www.haomeiwen.com/subject/yrdcrctx.html