美文网首页工作生活
基于 WebFlux 的监听器

基于 WebFlux 的监听器

作者: AngryApe | 来源:发表于2019-07-01 17:53 被阅读0次

    WebFlux 是原生的发布订阅工具,可以很方便的构建事件总线。下面是一个监听数据变动的监听器:

    package com.example.demo;
    
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.FluxSink;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.function.Consumer;
    
    /**
     * 数据监听器
     *
     * @author <a href="mailto:pushu@2dfire.com">朴树</a>
     * @date 2019-07-01 17:32
     */
    public class ReactorDataMonitor {
        private static final Map<Class, FluxSink> handlers = new ConcurrentHashMap<>();
    
        /**
         * 监控指定类型的数据
         *
         * @param clz     数据类型
         * @param handler 数据消费方式
         */
        public static void monitor(Class clz, Consumer handler) {
            Flux<Object> objectFlux = Flux.create(sink -> {
                handlers.put(clz, sink);
                sink.onCancel(() -> handlers.remove(clz));
            }, FluxSink.OverflowStrategy.LATEST);
    
            objectFlux.subscribe(handler);
        }
    
        /**
         * 取消监控数据
         *
         * @param clz 数据类型
         */
        public static void unMonitor(Class clz) {
            handlers.remove(clz);
        }
    
        /**
         * 发布数据
         *
         * @param object
         */
        public static void publish(Object object) {
            handlers.forEach((key, value) -> {
                if (key.equals(object.getClass())) {
                    value.next(object);
                }
            });
        }
    
    }
    

    以上代码中FluxSink是一个可以持续发布数据的数据源。

    相关文章

      网友评论

        本文标题:基于 WebFlux 的监听器

        本文链接:https://www.haomeiwen.com/subject/cxencctx.html