美文网首页
reactor3 flux 多个订阅者

reactor3 flux 多个订阅者

作者: simians | 来源:发表于2022-04-17 18:30 被阅读0次

    在一个数据源里想要有多个订阅者消费时应该怎么做呢?

            List<String> list = new ArrayList<>();
            list.add("1");
            list.add("2");
            list.add("3");
            list.add("4");
            list.add("5");
            Flux<String> flux = Flux.fromIterable(list);
            flux.subscribe(System.out::println);
            flux.subscribe(System.out::println);
            flux.subscribe(System.out::println);
    

    那么如果我想要有三个个订阅者的时候才开始消费数据源该如何做呢

            Flux<String> flux = Flux.fromIterable(list);
            ConnectableFlux<String> con = flux.publish();
            con.subscribe(System.out::println);
            con.subscribe(System.out::println);
            con.subscribe(System.out::println);
            // 手动的开启消费数据
            con.connect();
    
    

    如果感觉手动开启太麻烦也可以这样

        // autoConnect(3) 表示如果订阅者达到三个 就自动开启
            Flux<String> auto = flux.publish().autoConnect(3);
            auto.subscribe(System.out::println);
            auto.subscribe(System.out::println);
            auto.subscribe(System.out::println);
    
            Thread.sleep(1000L);
    

    如果感觉还不够好的,比如当有一个订阅者突然断开了,我想停止消费数据该怎么做呢

          // 如果订阅者少于三个就会停止消费数据,直到订阅者达到三个为止
            Flux<String> auto = flux.publish().refCount(3);
            auto.subscribe(System.out::println);
            auto.subscribe(System.out::println);
            auto.subscribe(System.out::println);
    
            Thread.sleep(1000L);
    
          // 如果订阅者少于三个且超过十秒没有新的订阅才会停止消费数据
            Flux<String> auto = flux.publish().refCount(3,Duration.ofSeconds(10));
            auto.subscribe(System.out::println);
            auto.subscribe(System.out::println);
            auto.subscribe(System.out::println);
    
            Thread.sleep(1000L);
    

    相关文章

      网友评论

          本文标题:reactor3 flux 多个订阅者

          本文链接:https://www.haomeiwen.com/subject/ibklertx.html