声明:差不多就是Transforming an Existing Sequence的翻译
一、转换当前的序列
- 一对一转换(比如将当前字符串的值转换为其长度):map
Flux.just("str1" , "str").map(item -> {return item.length();}).subscribe(System.out::println);//4 3
- 仅作类型转换:cast
Mono.just("str1").cast(Object.class);
- 将元素转换为带序号的封装元素,序号来自
index
操作所接收到的顺序,从0开始Flux.just("str1","str2").index();//3.1.0版本似乎没有这个方法
- 将元素转换为带有记录间隔时间的封装元素,间隔时间是
elapsed
操作接收到上一个数据到当前数据的时间间隔Flux.interval(Duration.ofSeconds(1)).elapsed().subscribe(System.out::println,System.out::println,null,sub -> sub.request(3)); //----------------------- [1007,0] [1001,1] [999,2]
- 仅作类型转换:cast
- 一对多转换(比如将字符串转换为组成它们的字符): flatMap
注意:flatMap
会将其内部多个Publisher
合成为一个(按照时间顺序),所以最后展现的还是一个序列Flux.just("str1","str2").flatMap(item -> { return Flux.fromStream(item.chars().mapToObj(c -> Character.valueOf((char) c))); } , 2).subscribe(System.out::println); //----------------(结果去掉换行) s t r 1 s t r 2
- 如果想忽略掉
flatMap
中某些数据可以使用Mono.empty()
- 如果想使得
flatMap
的数据按原数据顺序排列(比如转换公式为:str => s t r ,原数据如果为str1 str2 那么转换后的排序顺序必为 s t r 1 s t r 2,即只要上一个元素内部转换流没有走完,下一个转换内容的永远不会被输出,但会被记录),可以试试Flux#flatMapSequential
-
Mono
的一对多转换(转换为Flux
):Mono#flatMapMany
- 如果想忽略掉
二、在已有序列中增添新的元素
- 在序列之前 :Flux#startWith
Flux.just(1,2,3).startWith(0).subscribe(System.out::println); //------------------ 0 1 2 3
- 在序列之后 :Flux#concatWith
三、对Flux进行归并
-
归并为List:Flux#collectList , Flux#collectSortedList
等待接收完成,并将所有数据归并成一个Mono<List>
进行返回,后者比前者多个排序功能Flux.just(3,5,2,1).collectSortedList((left , right) -> { if(left > right){ return 1; }else if(left < right){ return -1; }else{ return 0; } }).subscribe(System.out::println); //--------------- [1, 2, 3, 5]
-
归并为Map:Flux#collectMap , Flux#collectMultiMap
等待接收完成,将所有数据归并为一个Mono<Map>
进行返回,后者返回的是Mono<Map<key,Collection>>
Flux.just(3,5,2,1).collectMap(item -> item.toString()).subscribe(System.out::println); //------------------------ //{1=1, 2=2, 3=3, 5=5}
-
由Collector来完成归并:Flux#collect
Flux.just(3,5,2,1).collect(Collectors.counting()).subscribe(System.out::println); //-------------------- //4
-
计算序列中元素数量:Flux#count
Flux.just(3,5,2,1).count().subscribe(System.out::println); //----------------------------------- //4
-
对序列中的每一个元素应用一个回调,该回调的结果会带入下一次回调,直到所有元素被转换为一个最终回调结果:Flux#reduce
Flux.just(3,5,2,1).reduce("number:" , (lastResult,item) -> lastResult + item.toString()).subscribe(System.out::println); //---------------- //number:3521
- 在Flux#reduce的基础上,增加输出每次转换结果的功能:Flux#scan
Flux.just(3,5,2,1).scan("number:" , (lastResult,item) -> lastResult + item.toString()).subscribe(System.out::println); //------------------------- number: number:3 number:35 number:352 number:3521
- 在Flux#reduce的基础上,增加输出每次转换结果的功能:Flux#scan
-
归并成布尔值
- 指定判断式,是否符合所有元素:Flux#all
Flux.just(3,5,2,1).all(item -> item < 10).subscribe(System.out::println); //--------------- //true
注意,如果有一个不符合,将会立刻向上游发送
cancel
信号,并向下游发出元素false
- 指定判断式,是否至少有一个符合:Flux#any
注意,如果有一个符合,将会将会立刻向上游发送
cancel
信号,并向下游发出元素true
,下面两个也同理,达到要求便立刻中止- 检验序列中是否有元素:Flux#hasElements()
Flux.empty().hasElements().subscribe(System.out::println); //--------------- //false
- 检验序列中是否有特定元素:Flux#hasElement(T value)
四、合并多个生产者
- 以序列的顺序进行合并:Flux#concatWith(other)
Flux.just(1,2,3).concatWith(Flux.just(4,5,6)).subscribe(System.out::println); //--------------------------------- //1 2 3 4 5 6
- 以发出的元素顺序进行合并,先发出的元素在前:Flux#mergeWith(other)
Flux.interval(Duration.ofSeconds(1)).mergeWith(Flux.interval(Duration.ofSeconds(1))).subscribe(System.out::println); //--------------- //0 0 1 1 2 2 3 3 ....
- 在
mergeWith
基础上,增加了兼容不同类型生产者的功能和合并元素的功能,如[1,2,3]和[a,b,c,d]会合并成[1a,2b,3c]:Flux#zipWithFlux.just(1,2,3).zipWith(Flux.just("a" , "b" , "c")).subscribe(System.out::println); //------ //[1,a] [2,b] [3,c] Flux.interval(Duration.ofSeconds(1)).zipWith(Flux.just("a","b","c") , (f,s) -> f.toString().concat(s)).subscribe(System.out::println); //-------- //0a 1b 2c
- 在
- 等待另一个序列结束,然后丢出一个Mono<Void>:Mono#and
Mono.just("e").and(Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(null , null ,() -> System.out.println("ok")); //----------- //0 1 2 ok
- 等待指定所有序列结束,然后丢出一个Mono<Void>:Mono#when
Mono.when(Flux.just(1,2,3) , Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(null , null ,() -> System.out.println("ok")); //------------- //0 1 2 ok
- 与
zipWith
相似,但是每次合并的元素都取自其他序列发出的最近一个值,而不是一直等待其他序列发出下一个值:Flux#combineLatestFlux.combineLatest(objects -> objects[0].toString().concat(objects[1].toString()) ,Flux.interval(Duration.ofSeconds(1)).take(3) , Flux.just("a","b","c")).subscribe(System.out::println); // 0c 1c 2c
- 有一个生产者序列集合,以谁先发出第一个元素来判断选择哪个序列进行输出
Flux.first(Flux.interval(Duration.ofSeconds(1)).take(3) , Flux.just(4,5,6)).subscribe(System.out::println); //---------- // 4 5 6
- 与
flatMap
相似,但是当转换的序列还没结束,但是主序列的下一个元素已经到来额时候,会直接取消掉转换后的序列,也就是说同时只能存在一个转换的序列被执行:switchMap
五、重复
- 完成后,重新订阅该序列,重复输出,永不停止:repeat
- 上述功能,再加上时间间隔:Flux.interval(duration).flatMap(tick → myExistingPublisher)
六、只对完成信号感兴趣,即忽略元素
- 忽略所有的元素,如果上游发出完成信号,则完成:ignoreElements
- 忽略所有的元素,只响应错误信号和完成信号,完成返回
Mono<Void>
: then- 变体--响应完成时不再返回
Mono<Void>
,而是正常执行参数中的Mono
,并将其返回值作为返回值:then(Mono<T> other)
- 变体--响应完成时不再返回
- 完成后,还要再完成一个提供的空任务后才返回: Mono<Void> thenEmpty(Publisher<Void> other)
- 完成后,返回提供的值:Mono<T> Mono#thenReturn(T)
- 完成后,执行提供的Flux,其元素会正常输出:thenMany
七、有一个需要延迟完成的Mono
- 需要等待该Mono中元素所生成的序列完成后再向下游发出该元素:Mono#delayUntil(Function)
Mono.just("complete!").delayUntil(item -> Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(System.out::println); //------------------------------------------- //0 1 2 complete!
八、如果需要对流中元素进行递归操作
- 以分支为先展开: Flux#expand(Function)
即先对所有元素进行一遍递归,然后再对各个递归后序列中的序列进行递归Flux.just(1,2).expand(item -> item > 6 ? Mono.empty() : Flux.just(item *2)).subscribe(System.out::println); //-------------- //1 2 2 4 4 8 8
- 以深度为先展开:Flux#expandDeep(Function)
Flux.just(1,2).expandDeep(item -> item > 6 ? Mono.empty() : Flux.just(item *2)).subscribe(System.out::println); //----------------- //1 2 4 8 2 4 8
九、序列为空时转换
- 如果序列为空,则输出提供的预定值: defaultIfEmpty
- 如果序列为空,则订阅提供的预定序列::switchIfEmpty
参考文档:
[1] Reactor api doc
[2] Reactor reference doc
网友评论