美文网首页java
Reactor学习:五、中间操作—转换

Reactor学习:五、中间操作—转换

作者: 睦月MTK | 来源:发表于2020-08-04 21:46 被阅读0次

    声明:差不多就是Transforming an Existing Sequence的翻译


    一、转换当前的序列
    • 一对一转换(比如将当前字符串的值转换为其长度):map
      Flux.just("str1" , "str").map(item -> {return   item.length();}).subscribe(System.out::println);//4 3 
      
      • 仅作类型转换:cast
        Mono.just("str1").cast(Object.class);
        
      • 将元素转换为带序号的封装元素,序号来自index操作所接收到的顺序,从0开始
        Flux.just("str1","str2").index();//3.1.0版本似乎没有这个方法
        
      • 将元素转换为带有记录间隔时间的封装元素,间隔时间是elapsed操作接收到上一个数据到当前数据的时间间隔
        Flux.interval(Duration.ofSeconds(1)).elapsed().subscribe(System.out::println,System.out::println,null,sub -> sub.request(3));
        //-----------------------
        [1007,0]
        [1001,1]
        [999,2]
        
    • 一对多转换(比如将字符串转换为组成它们的字符): flatMap
      注意:flatMap会将其内部多个Publisher合成为一个(按照时间顺序),所以最后展现的还是一个序列
      Flux.just("str1","str2").flatMap(item -> {
                  return Flux.fromStream(item.chars().mapToObj(c ->   Character.valueOf((char) c)));
              } , 2).subscribe(System.out::println);
      //----------------(结果去掉换行)
      s t r 1 s t r 2
      
      • 如果想忽略掉flatMap中某些数据可以使用Mono.empty()
      • 如果想使得flatMap的数据按原数据顺序排列(比如转换公式为:str => s t r ,原数据如果为str1 str2 那么转换后的排序顺序必为 s t r 1 s t r 2,即只要上一个元素内部转换流没有走完,下一个转换内容的永远不会被输出,但会被记录),可以试试Flux#flatMapSequential
      • Mono的一对多转换(转换为Flux):Mono#flatMapMany

    二、在已有序列中增添新的元素
    • 在序列之前 :Flux#startWith
      Flux.just(1,2,3).startWith(0).subscribe(System.out::println);
      //------------------
      0 1 2 3
      
    • 在序列之后 :Flux#concatWith

    三、对Flux进行归并
    • 归并为List:Flux#collectList , Flux#collectSortedList
      等待接收完成,并将所有数据归并成一个Mono<List>进行返回,后者比前者多个排序功能

      Flux.just(3,5,2,1).collectSortedList((left , right) -> {
          if(left > right){
              return 1;
          }else if(left < right){
              return -1;
          }else{
              return 0;
          }
      }).subscribe(System.out::println);
      //---------------
      [1, 2, 3, 5]
      
    • 归并为Map:Flux#collectMap , Flux#collectMultiMap
      等待接收完成,将所有数据归并为一个Mono<Map>进行返回,后者返回的是Mono<Map<key,Collection>>

      Flux.just(3,5,2,1).collectMap(item -> item.toString()).subscribe(System.out::println);
      //------------------------
      //{1=1, 2=2, 3=3, 5=5}
      
    • 由Collector来完成归并:Flux#collect

      Flux.just(3,5,2,1).collect(Collectors.counting()).subscribe(System.out::println);
      //--------------------
      //4
      
    • 计算序列中元素数量:Flux#count

      Flux.just(3,5,2,1).count().subscribe(System.out::println);
      //-----------------------------------
      //4
      
    • 对序列中的每一个元素应用一个回调,该回调的结果会带入下一次回调,直到所有元素被转换为一个最终回调结果:Flux#reduce

      Flux.just(3,5,2,1).reduce("number:" , (lastResult,item) -> lastResult + item.toString()).subscribe(System.out::println);
      //----------------
      //number:3521
      
      • 在Flux#reduce的基础上,增加输出每次转换结果的功能:Flux#scan
        Flux.just(3,5,2,1).scan("number:" , (lastResult,item) -> lastResult + item.toString()).subscribe(System.out::println);
        //-------------------------
        number:
        number:3
        number:35
        number:352
        number:3521
        
    • 归并成布尔值

      • 指定判断式,是否符合所有元素:Flux#all
      Flux.just(3,5,2,1).all(item -> item < 10).subscribe(System.out::println);
      //---------------
      //true
      

      注意,如果有一个不符合,将会立刻向上游发送cancel信号,并向下游发出元素false

      • 指定判断式,是否至少有一个符合:Flux#any

      注意,如果有一个符合,将会将会立刻向上游发送cancel信号,并向下游发出元素true,下面两个也同理,达到要求便立刻中止

      • 检验序列中是否有元素:Flux#hasElements()
        Flux.empty().hasElements().subscribe(System.out::println);
        //---------------
        //false
        
      • 检验序列中是否有特定元素:Flux#hasElement(T value)

    四、合并多个生产者
    • 以序列的顺序进行合并:Flux#concatWith(other)
      Flux.just(1,2,3).concatWith(Flux.just(4,5,6)).subscribe(System.out::println);
      //---------------------------------
      //1 2 3 4 5 6
      
    • 以发出的元素顺序进行合并,先发出的元素在前:Flux#mergeWith(other)
      Flux.interval(Duration.ofSeconds(1)).mergeWith(Flux.interval(Duration.ofSeconds(1))).subscribe(System.out::println);
      //---------------
      //0 0 1 1 2 2 3 3 ....
      
      • mergeWith基础上,增加了兼容不同类型生产者的功能和合并元素的功能,如[1,2,3]和[a,b,c,d]会合并成[1a,2b,3c]:Flux#zipWith
        Flux.just(1,2,3).zipWith(Flux.just("a" , "b" , "c")).subscribe(System.out::println);
        //------
        //[1,a] [2,b] [3,c]
        Flux.interval(Duration.ofSeconds(1)).zipWith(Flux.just("a","b","c") , (f,s) -> f.toString().concat(s)).subscribe(System.out::println);
        //--------
        //0a 1b 2c
        
    • 等待另一个序列结束,然后丢出一个Mono<Void>:Mono#and
      Mono.just("e").and(Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(null , null ,() -> System.out.println("ok"));
      //-----------
      //0 1 2 ok
      
    • 等待指定所有序列结束,然后丢出一个Mono<Void>:Mono#when
      Mono.when(Flux.just(1,2,3) , Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(null , null ,() -> System.out.println("ok"));
      //-------------
      //0 1 2 ok
      
    • zipWith相似,但是每次合并的元素都取自其他序列发出的最近一个值,而不是一直等待其他序列发出下一个值:Flux#combineLatest
      Flux.combineLatest(objects -> objects[0].toString().concat(objects[1].toString()) ,Flux.interval(Duration.ofSeconds(1)).take(3) , Flux.just("a","b","c")).subscribe(System.out::println);
      // 0c 1c 2c
      
    • 有一个生产者序列集合,以谁先发出第一个元素来判断选择哪个序列进行输出
      Flux.first(Flux.interval(Duration.ofSeconds(1)).take(3) , Flux.just(4,5,6)).subscribe(System.out::println);
      //----------
      // 4 5 6
      
    • flatMap相似,但是当转换的序列还没结束,但是主序列的下一个元素已经到来额时候,会直接取消掉转换后的序列,也就是说同时只能存在一个转换的序列被执行:switchMap

    五、重复
    • 完成后,重新订阅该序列,重复输出,永不停止:repeat
      • 上述功能,再加上时间间隔:Flux.interval(duration).flatMap(tick → myExistingPublisher)

    六、只对完成信号感兴趣,即忽略元素
    • 忽略所有的元素,如果上游发出完成信号,则完成:ignoreElements
    • 忽略所有的元素,只响应错误信号和完成信号,完成返回Mono<Void> : then
      • 变体--响应完成时不再返回Mono<Void>,而是正常执行参数中的Mono,并将其返回值作为返回值:then(Mono<T> other)
    • 完成后,还要再完成一个提供的空任务后才返回: Mono<Void> thenEmpty(Publisher<Void> other)
    • 完成后,返回提供的值:Mono<T> Mono#thenReturn(T)
    • 完成后,执行提供的Flux,其元素会正常输出:thenMany

    七、有一个需要延迟完成的Mono
    • 需要等待该Mono中元素所生成的序列完成后再向下游发出该元素:Mono#delayUntil(Function)
      Mono.just("complete!").delayUntil(item -> Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(System.out::println);
      //-------------------------------------------
      //0 1 2 complete!
      

    八、如果需要对流中元素进行递归操作
    • 以分支为先展开: Flux#expand(Function)
      即先对所有元素进行一遍递归,然后再对各个递归后序列中的序列进行递归
      Flux.just(1,2).expand(item -> item > 6 ? Mono.empty() : Flux.just(item *2)).subscribe(System.out::println);
      //--------------
      //1 2 2 4 4 8 8
      
    • 以深度为先展开:Flux#expandDeep(Function)
      Flux.just(1,2).expandDeep(item -> item > 6 ? Mono.empty() : Flux.just(item *2)).subscribe(System.out::println);
      //-----------------
      //1 2 4 8 2 4 8
      

    九、序列为空时转换
    • 如果序列为空,则输出提供的预定值: defaultIfEmpty
    • 如果序列为空,则订阅提供的预定序列::switchIfEmpty

    参考文档:
    [1] Reactor api doc
    [2] Reactor reference doc

    相关文章

      网友评论

        本文标题:Reactor学习:五、中间操作—转换

        本文链接:https://www.haomeiwen.com/subject/wsglqktx.html