美文网首页guava
Guava EventBus实现原理

Guava EventBus实现原理

作者: 晴天哥_王志 | 来源:发表于2021-04-22 21:09 被阅读0次

开篇

  • EventBus是Guava的事件处理机制,是设计模式中的观察者模式的优雅实现。对于事件监听和发布订阅模式,可以使用EventBus完美的解决。这篇文章主要是想了解下EventBus底层的实现逻辑,做到使用的时候更加游刃有余。
  • EventBus的整体使用方式如下图所示,基于Event的驱动来实现Publisher和Subscriber之前的通信。


EventBus用法

public class GuavaTest {

    public static void main(String[] args) {
        // 1、定义EventBus对象
        EventBus eventBus = new EventBus();

        // 2、定义两个观察者对象
        DataObserver1 observer1 = new DataObserver1();
        DataObserver2 observer2 = new DataObserver2();

        // 3、注册两个观察者
        eventBus.register(observer1);
        eventBus.register(observer2);
        
        // 4、分发事件
        eventBus.post(123);
        eventBus.post("hello world");
    }
}

// 定义观察者一
class DataObserver1 {

    @Subscribe
    public void action(Integer msg) {
        System.out.println("DataObserver1 String msg: " + msg);
    }

    @Subscribe
    public void action(String msg) {
        System.out.println("DataObserver1 String msg: " + msg);
    }
}

// 定义观察者二
class DataObserver2 {

    @Subscribe
    public void action(Integer msg) {
        System.out.println("DataObserver2 String msg: " + msg);
    }

    @Subscribe
    public void action(String msg) {
        System.out.println("DataObserver2 String msg: " + msg);
    }
}
  • 1、EventBus的核心对象包含EventBus对象、观察者对象Observer、事件对象Event。
  • 2、观察者对象通过注解@Subscribe来定义处理事件的逻辑实现方法,该方法的参数定义了事件类型
  • 3、通过EventBus的register方法实现观察者Objserver和EventBus的关联。
  • 4、通过EventBus的post方法实现事件Event的分发。

EventBus定义

public class EventBus {

  private final String identifier;
  private final Executor executor;
  private final SubscriberExceptionHandler exceptionHandler;
  // SubscriberRegistry用来保存监听者的信息
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);

  // dispatcher使用的是PerThreadQueuedDispatcher
  private final Dispatcher dispatcher;


  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  }

  public void register(Object object) {
    subscribers.register(object);
  }
}
  • EventBus的核心变量subscribers用来保存订阅者, Dispatcher用来定义分发的方法。
  • EventBus的注册方法register就是把观察者注册到subscribers当中。

EventBus订阅过程

final class SubscriberRegistry {

  private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();

  void register(Object listener) {
    // 获取监听者内部对应的@Subscribe注解的方法并解析成事件类型维度的Multimap,
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    
    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      
      // 事件监听信息保存在subscribers当中,以事件类型为维度
      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

  // 解析监听者对象
  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    // 获取监听的类型实现
    Class<?> clazz = listener.getClass();
    // 获取指定@Subscribe注解的方法
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      // 按照@Subscribe注解指定方法的参数为维度进行组织
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }
}
  • EventBus的subscribers用来保存观察者。
  • EventBus的注册过程核心逻辑包含发现观察者解析事件方法和保存观察者。
  • findAllSubscribers解析包含@Subscribe注解的方法生成事件类型为key,Subscriber对象为value的map对象。
  • Subscriber保存了观察者对象以及执行的方法,用来分发的时候进行回调操作。

EventBus分发过程

public class EventBus {

  public void post(Object event) {
    // 获取事件的监听者
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);

    // 遍历监听者进行事件的dispatch
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

  private static final class PerThreadQueuedDispatcher extends Dispatcher {

    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };

    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));

      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }
}
  • Event的分发核心以分发的事件event去查找对应的观察者进行分发,分发的过程就是遍历观察者进行分发的过程。
  • PerThreadQueuedDispatcher支持的线程维度的分发,内部通过queue做了中转进行消息的分发。
  • dispatchEvent是执行Subscriber的回调方法的真正逻辑。

EventBus通知过程

class Subscriber {

  @Weak private EventBus bus;
  @VisibleForTesting final Object target;
  private final Method method;
  private final Executor executor;

  private Subscriber(EventBus bus, Object target, Method method) {
    this.bus = bus;
    this.target = checkNotNull(target);
    this.method = method;
    method.setAccessible(true);

    this.executor = bus.executor();
  }

  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
            }
          }
        });
  }

  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
    } catch (IllegalAccessException e) {
    } catch (InvocationTargetException e) {
    }
  }
}
  • 回调的过程就是执行invokeSubscriberMethod方法,内部就是常见的method.invoke反射进行回调实现。

相关文章

  • Guava EventBus实现原理

    开篇 EventBus是Guava的事件处理机制,是设计模式中的观察者模式的优雅实现。对于事件监听和发布订阅模式,...

  • Guava - EventBus(事件总线)

    Guava在guava-libraries中为我们提供了事件总线EventBus库,它是事件发布订阅模式的实现,让...

  • EventBus源码解析

    知识点汇总: 一:EventBus框架概述 二:EventBus的注册实现原理 三:EventBus的事件分发实现...

  • Guava之EventBus消息发布订阅实现

    消息发布订阅实现 guava中的EventBus在项目开发中,可以快速实现发布订阅模型,不需要我们自己去实现.下面...

  • Guava EventBus

    http://www.baeldung.com/guava-eventbus

  • guava 限流RateLimiter 初探(一)

    Guava RateLimiter Guava RateLimiter原理 Guava RateLimiter基于...

  • EventBus

    一、EventBus的原理 EventBus是全局事件总线,底层通过Stream来实现;它可以实现不同页面的跨层访...

  • EventBus - (观察者模式) 消息发布订阅类库

    1,简介 1)EventBus实现了观察者模式,是Google.Guava提供的消息发布-订阅类库。2)Multi...

  • Guava EventBus

    我称其为单块架构的利器 前言 在设计模式中, 有一种叫做发布/订阅模式, 即某事件被发布, 订阅该事件的角色将自动...

  • Guava——EventBus

    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监...

网友评论

    本文标题:Guava EventBus实现原理

    本文链接:https://www.haomeiwen.com/subject/ikmfrltx.html