美文网首页
观察者模式

观察者模式

作者: lj72808up | 来源:发表于2021-04-20 16:03 被阅读0次

    观察者模式 - 发布订阅模式

    1. 什么是观察者模式

    • 在对象间建立一个"一对多"的依赖. 当一个对象的状态发生变化, 其他依赖的所有对象都会自动收到通知.
      被依赖的对象叫做被观察者(Observable) , 依赖的对象叫做观察者 (Observer). 同样含义的称呼有: pub-sub, 生产者消费者, eventemitter-eventlistener
      观察者模式根据场景不同有不同实现:
    • 实现方式
      • 进程内的

        • 同步阻塞方式
        • 异步非阻塞模式 (Guava EventBus)
      • 跨进程的
        rabbitmq

    2. 经典实现: List[Observer]

    场景: 用户注册后, 进行消息推动和用户通知. 用户注册这个个动作做为报观察者, 消息推送和用户通知作为观察者, 调用者调用者

    /**
     * 观察者模式: 将观察者和被观察者解耦
     * 观察者接口
     */
    interface RegObserver {
        void handleRegSuccess(long userId);
    }
    
    /**
     * 负责推送消息的观察者
     */
    class RegPromotionObserver implements RegObserver {
        private PromotionService promotionService; // 依赖注入
    
        @Override
        public void handleRegSuccess(long userId) {
            promotionService.issueNewUserExperienceCash(userId);
        }
    }
    
    /**
     * 负责通知用户的观察者
     */
    class RegNotificationObserver implements RegObserver {
        private NotificationService notificationService;
    
        @Override
        public void handleRegSuccess(long userId) {
            notificationService.sendInboxMessage(userId, "Welcome...");
        }
    }
    
    class UserController {
        private UserService userService; // 依赖注入
        private List<RegObserver> regObservers = new ArrayList<>();
    
        // 一次性设置好,之后也不可能动态的修改
        public void setRegObservers(List<RegObserver> observers) {
            regObservers.addAll(observers);
        }
    
        public Long register(String telephone, String password) {
            //省略输入参数的校验代码
            //省略userService.register()异常的try-catch代码
            long userId = userService.register(telephone, password);  // 真实的注册逻辑
    
            //todo 下面是同步阻塞方式实现观察者和非观察者, 如果在线程池中执行, 就是异步非阻塞模式
            for (RegObserver observer : regObservers) {
                observer.handleRegSuccess(userId);
            }
    
            return userId;
        }
    }
    
    
    interface PromotionService {
        void issueNewUserExperienceCash(long userId);
    }
    
    interface NotificationService{
        void sendInboxMessage(long userId, String msg);
    }
    
    interface UserService {
        long register(String telephone, String password);
    }
    

    3. Guava EventBus

    EventBus 翻译为“事件总线”,它提供了实现观察者模式的骨架代码。利用 EventBus 框架实现的观察者模式,跟从零开始编写的观察者模式相比,从大的流程上来说,实现思路大致一样,都需要:

    • 定义 Observer 类, 但不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息。
    • 并且通过 register() 函数注册 Observer,
    • 也都需要通过调用某个函数(比如,EventBus 中的 post() 函数)来给 Observer 发送消息(在 EventBus 中消息被称作事件 event)

    Guava EventBus 框架跟经典的观察者模式的不同之处在于,当我们调用 post() 函数发送消息的时候,不是只把消息发送给该消息类型对应的观察者,而是发送给消息类型可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息(post 函数定义中的 event)类型的父类

    /**
     * 调用者
     */
    class UserControllerGuava {
        private UserService userService; // 依赖注入
    
        private EventBus eventBus;
        private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;
    
        public UserControllerGuava() {
            //eventBus = new EventBus(); // 同步阻塞模式
            eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); // 异步非阻塞模式
        }
    
        public void setRegObservers(List<Object> observers) {
            for (Object observer : observers) {
                eventBus.register(observer);
            }
        }
    
        public Long register(String telephone, String password) {
            //省略输入参数的校验代码
            //省略userService.register()异常的try-catch代码
            long userId = userService.register(telephone, password);
    
            eventBus.post(userId);
    
            return userId;
        }
    }
    
    /**
     * 观察者一
     */
    class RegPromotionObserverGuava {
        private PromotionService promotionService; // 依赖注入
    
        @Subscribe
        public void handleRegSuccess(Long userId) {
            promotionService.issueNewUserExperienceCash(userId);
        }
    }
    
    /**
     * 观察者二
     */
    class RegNotificationObserverGuava {
        private NotificationService notificationService;
    
        @Subscribe
        public void handleRegSuccess(Long userId) {
            notificationService.sendInboxMessage(userId, "...");
        }
    }
    

    4. 自己实现一个 EventBUs

    整个小框架的代码实现包括 5 个类:EventBus、AsyncEventBus、Subscribe、ObserverAction、ObserverRegistry。接下来,我们依次来看下这 5 个类。

    • @Subscribe: 是一个注解,用于标明观察者中的哪个函数可以接收消息。
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    @Beta
    public @interface Subscribe {}
    
    • ObserverAction: 表示对观察者方法的调用 (target 表示观察者类,method 表示方法。)
    public class ObserverAction {
        private Object target;
        private Method method;
    
        public ObserverAction(Object target, Method method) {
            this.target = Preconditions.checkNotNull(target);
            this.method = method;
            this.method.setAccessible(true);
        }
    
        public void execute(Object event) { // event是method方法的参数
            try {
                method.invoke(target, event);
            } catch (InvocationTargetException | IllegalAccessException e) {
                e.printStackTrace();
            }
        }
    }
    
    • ObserverRegistryObserverRegistry 类就是前面讲到的 Observer 注册表,记录被观察的事件类型和 OberserverAction 映射
      这是一个复杂的类, 包括注册监听者, 查找监听事件类型和其父类类型的 action,
    public class ObserverRegistry {
        private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();
    
        /** 注册的事件动作 */
        public void register(Object observer) {
            Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);
            for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
                Class<?> eventType = entry.getKey();
                Collection<ObserverAction> eventActions = entry.getValue();
                CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);
                if (registeredEventActions == null) {
                    registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
                    registeredEventActions = registry.get(eventType);
                }
                registeredEventActions.addAll(eventActions);
            }
        }
    
        /** 在缓存 map 中查找该监听对象和该监听对象的父类 */
        public List<ObserverAction> getMatchedObserverActions(Object event) {
            List<ObserverAction> matchedObservers = new ArrayList<>();
            Class<?> postedEventType = event.getClass();
            for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
                // map 中对应的事件类型
                Class<?> eventType = entry.getKey();
                Collection<ObserverAction> eventActions = entry.getValue();
                if (postedEventType.isAssignableFrom(eventType)) {
                    matchedObservers.addAll(eventActions);
                }
            }
            return matchedObservers;
        }
    
        /** 在 @Subscribe 注解方法只有一个参数, 表示被监听的对象. 监听列表 */
        private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
            Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
            Class<?> clazz = observer.getClass();
            for (Method method : getAnnotatedMethods(clazz)) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                Class<?> eventType = parameterTypes[0];
                if (!observerActions.containsKey(eventType)) {
                    observerActions.put(eventType, new ArrayList<>());
                }
                observerActions.get(eventType).add(new ObserverAction(observer, method));
            }
            return observerActions;
        }
    
        /** 获取某个类下面被 @Subscribe 注解的方法 */
        private List<Method> getAnnotatedMethods(Class<?> clazz) {
            List<Method> annotatedMethods = new ArrayList<>();
            for (Method method : clazz.getDeclaredMethods()) {
                if (method.isAnnotationPresent(Subscribe.class)) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    // 校验参数个数只有一个
                    Preconditions.checkArgument(parameterTypes.length == 1,
                            "Method %s has @Subscribe annotation but has %s parameters."
                                    + "Subscriber methods must have exactly 1 parameter.",
                            method, parameterTypes.length);
                    annotatedMethods.add(method);
                }
            }
            return annotatedMethods;
        }
    }
    
    • EventBus: 同步事件总线. 对事件的调用方
    
    public class EventBus {
      private Executor executor;
      private ObserverRegistry registry = new ObserverRegistry();
    
      public EventBus() {
        this(MoreExecutors.directExecutor());
      }
    
      protected EventBus(Executor executor) {
        this.executor = executor;
      }
    
      public void register(Object object) {
        registry.register(object);
      }
    
      public void post(Object event) {
        List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
        for (ObserverAction observerAction : observerActions) {
          executor.execute(new Runnable() {
            @Override
            public void run() {
              observerAction.execute(event);
            }
          });
        }
      }
    }
    
    • AsyncEventBus: 异步事件总线. 需要传入一个线程池
    
    public class AsyncEventBus extends EventBus {
      public AsyncEventBus(Executor executor) {
        super(executor);
      }
    }
    

    相关文章

      网友评论

          本文标题:观察者模式

          本文链接:https://www.haomeiwen.com/subject/lxunlltx.html