在一个数据源里想要有多个订阅者消费时应该怎么做呢?
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Flux<String> flux = Flux.fromIterable(list);
flux.subscribe(System.out::println);
flux.subscribe(System.out::println);
flux.subscribe(System.out::println);
那么如果我想要有三个个订阅者的时候才开始消费数据源该如何做呢
Flux<String> flux = Flux.fromIterable(list);
ConnectableFlux<String> con = flux.publish();
con.subscribe(System.out::println);
con.subscribe(System.out::println);
con.subscribe(System.out::println);
// 手动的开启消费数据
con.connect();
如果感觉手动开启太麻烦也可以这样
// autoConnect(3) 表示如果订阅者达到三个 就自动开启
Flux<String> auto = flux.publish().autoConnect(3);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
Thread.sleep(1000L);
如果感觉还不够好的,比如当有一个订阅者突然断开了,我想停止消费数据该怎么做呢
// 如果订阅者少于三个就会停止消费数据,直到订阅者达到三个为止
Flux<String> auto = flux.publish().refCount(3);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
Thread.sleep(1000L);
// 如果订阅者少于三个且超过十秒没有新的订阅才会停止消费数据
Flux<String> auto = flux.publish().refCount(3,Duration.ofSeconds(10));
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
auto.subscribe(System.out::println);
Thread.sleep(1000L);
网友评论