美文网首页
akka Stream SubStreams 操作

akka Stream SubStreams 操作

作者: 93张先生 | 来源:发表于2018-08-24 17:00 被阅读0次
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Sink, Source}
    
    object SubStreamExample {
      implicit val system = ActorSystem("QuickStart2")
      implicit val materializer = ActorMaterializer()
    
      def main(args: Array[String]): Unit = {
    
    //    exampleOne()
    //    mergeStream()
    //    mergeSubstreamsWithParallelism()
    //    splitWhen()
        flatMapConcatExample()
    //    flatMapMergeExample()
      }
      def exampleOne(): Unit ={
    
    
        //分成3组;然后每一组后面都添加一个sink;3个sink造成每个sink轮流输出是不能保证输出循序的
        Source(1 to 10).groupBy(3, _ % 3).to(Sink.ignore).run()
    
        //1 4 7 10 (1 + 4  + 7 + 10  ) = 22
        //2 5 8 (2  + 5 + 8) = 15
        //3 6 9 (3  + 6  + 9) = 18
        Source(1 to 10).groupBy(3, _ % 3).flatMapConcat( i => Source(List.fill(1)(i))).reduce((x,y) => x +y).to(Sink.foreach(println)).run()  //22 28 15
    
      }
      def mergeStream(): Unit ={
        //groupBy(3, _ % 3)  根据后面的函数,将输入流进行分组;第一个参数最大子流数量
        //mergeSubstreams;合并3组到一个组;顺序是不能保证的
        Source(1 to 10)
          .groupBy(3, _ % 3)
          .mergeSubstreams
          .runWith(Sink.foreach(println))
    
      }
    
      def mergeSubstreamsWithParallelism(): Unit ={
    
        //groupBy(3, _ % 3)  根据后面的函数,将输入流进行分组;第一个参数最大子流数量
        //mergeSubstreamsWithParallelism;limit the number of active substreams running and being merged at a time
        //如果并行度小于子流数,会发生阻塞;第三类元素出来之后没有子流;会一直阻塞;其他的输出流的元素也不能出来;得到有效处理
        Source(1 to 10)
          .groupBy(3, _ % 3)
          .mergeSubstreamsWithParallelism(2)
          .runWith(Sink.foreach(println))
    
        //concatSubstreams is equivalent to mergeSubstreamsWithParallelism(1)
        Source(1 to 10)
          .groupBy(3, _ % 3)
          .concatSubstreams
          .runWith(Sink.ignore)
      }
    
      def splitWhen(): Unit ={
        val text =
          "This is the first line.\n" +
            "The second line.\n" +
            "There is also the 3rd line\n"
    
        //splitWhen 和 splitAfter 返回true时,都会产生一个新的子流;when是当前元素进入新的子流;after是当前的下一个元素进入子流
        val charCount = Source(text.toList)
          .splitAfter { _ == '\n' }
          .filter(_ != '\n')
          .map(_ ⇒ 1)
          .reduce(_ + _)
          .to(Sink.foreach(println))
          .run()
      }
    
      def flatMapConcatExample(): Unit ={
        // List.fill(3)(1) 一个list,充满3个1元素
        //flatMapConcat 处理多个子流;处理完后一个流,再处理另一个流; 1 to 2;起两个流,完成后流作废
        Source(1 to 2)
          .flatMapConcat(i ⇒ Source(List.fill(3)(i)))
          .runWith(Sink.foreach(println))
      }
    
      def flatMapMergeExample(): Unit ={
        //flatMapMerge 多个子流处理数据;第一个参数为宽度;子流个数
        Source(1 to 2)
          .flatMapMerge(3, i ⇒ Source(List.fill(3)(i)))
          .runWith(Sink.foreach(println))
      }
    
    
    
    }
    
    

    相关文章

      网友评论

          本文标题:akka Stream SubStreams 操作

          本文链接:https://www.haomeiwen.com/subject/hykiiftx.html