美文网首页
Java8(二)之走进Stream

Java8(二)之走进Stream

作者: 依然范特希 | 来源:发表于2019-05-03 18:43 被阅读0次

    本篇文章是基于上一篇Lamda表达式的基础上的内容,因为其中涉及了较多的Lamda语法。如果对于Lamda表达式没有了解的同学,建议先看上一篇文章。

    什么是Stream

    直译过来是“流”,但和IO流完全不相及。此处的流并不是数据结构,也不进行数据的存储,在功能上更像一个迭代器,对每一个元素进行迭代计算。之所以叫“流”是因为是单向的,不可重复,如同“黄河之水天上来,奔流到海不复回”。

    为什么要使用Stream

    记得之前在开发系统的时候,使用oracle数据库,经常会写很多的复杂的sql语句,其中包含了很多求和,分组,排序,求平均值等内容,有的时候也会写存储过程来实现复杂计算。当数据量和请求量较小的时候是完全没有问题的,但是数据量和请求量较多时,数据库的压力就会比较大,也会导致数据库CPU高耗,当CPU使用率达到90%的时候整体数据库就会性能降低,一些正常的查询也会耗时较多。而对于MySQL数据库来说,进行复杂计算就是更不可能的事情。目前,基于大数据量和高并发的系统大都是分布式系统。对于分布式系统来说,可以根据模块的情况去增加机器来进行扩容,所以,就应该适当的去把一些计算的工作来让服务器端来做,而不是让数据库来做它本身不擅长的事情。如果是复杂而且海量的计算,可以考虑应用大数据技术来进行计算。所以使用Stream的目的概括如下:
    1.将计算压力协调到服务器端,以均衡数据库(如果是Oracle可以进行一些计算,MySQL就算了)在计算方面的压力。
    2.Stream可以进行并行计算,对于使用者来说不需要写太多的代码,其API接口基本和大数据Spark算子大致相似,所以对于接触过Spark的同学就很轻松了。

    如何创建Stream

    1. 通过数组创建
    Stream<Integer> stream = Arrays.stream(new Integer[]{1,23,4});
    stream = Stream.of(1,3,4);
    
    1. 通过集合创建(常用)
    List<String> list = new ArrayList<>();
    // 创建单向流
    list.stream();
    // 创建并行流(用于并行计算)
    list.paralleStream();
    
    1. 创建空的流
    Stream.empty();
    
    1. 创建无限流
    Stream.generate(()->"number"+new Random.nextInt()).limit(100).forEach(System.out::println)
    

    5.产生规律的无限流

    Stream.iterate(0,x->x+1).limit(10).forEach(System.out::println);
    

    提取流和组合流

    1. 提取固定个数
    Stream.of(1,2,3,4,5).limit(3).forEach(System.out::println);
    
    1. 跳过固定个数
    Stream.of(1,2,3,4,5).skip(3).forEach(System.out::println);
    
    1. 流融合
    Stream.concat(Stream.of(1,2,3),Stream.of(4,5,6)).forEach(System.out::println);
    

    常见算子

    1. map算子进行一对一转化
     // map算子对每一个元素进行转化
    Stream.of(1,3,4).map(x->x+1).forEach(System.out::println);
    
    1. filter算子对元素进行筛选
     // filter算子对符合条件的元素进行筛选
     Stream.of(1,3,4).filter(x->x>2).forEach(System.out::println);
    
    1. flatMap算子将一个元素转化为多个
     // flatMap算子将一行转化为多行
    Stream.of("1,2,3,4").flatMap(x->Stream.of(x.split(","))).forEach(System.out::println);
    
    1. sorted算子进行排序
    //sorted算子进行排序
    Stream.of(2,5,8,7,0).sorted().forEach(System.out::println);
    
    1. 最大值、最小值
    //最大值
    Stream.of(3,6,8,4,9).max((x1,x2) -> x1 - x2).ifPresent(System.out::println);
    //最小值
    Stream.of(3,6,8,4,9).max((x1,x2) -> x1 - x2).ifPresent(System.out::println);
    

    6.计算个数

    //计算个数
    Stream.of(11,33,44).count();
    
    1. 获取第一个元素
    Stream.of(11,9,8,43).findFirst().orElse(0);
    
    1. 获取到第一个匹配的元素
    //一般会配合并行流使用,只要在任何片段发现了第一个元素就停止运算
    Stream.of(1,2,3,45,66).parallel().filter(x->x>3).findAny().ifPresent(System.out::println);
    
    1. 是否包含某元素
    boolean flag = Stream.of(1,2,3,4).anyMatch(x->x>3);
    
    1. reduce合并操作
    Stream.of(1,2,3,4,5,6).reduce((x,y)->x+y).get();
    
    1. peek操作
    Stream.of(2,3,4).peek(System.out::println);
    

    Stream转化为集合

    Stream.of(1,2).collect(Collectors.toList());
    Stream.of(3,2).collect(Collectors.toSet());
    

    并行流以及测试

    使用并行流只需要在流开始的地方增加parallel(),如下:

    Stream.of(2,3,4).parallel().........
    

    并行流是基于JDK7中的 Fork/Join 框架基础上进行并行计算的,默认的线程数量是当前计算机的CPU核数。通过线程数量,对数据进行分片(partition),分布之后进行并行计算,然后对结果进行聚合(reduce)操作。
    也可以通过-Djava.util.concurrent.ForkJoinPool.common.parallelism=1进行控制并行流中启用线程的数量。下面通过一段代码和结果去测试使用并行流之后计算耗时更少。

            long startTime1 = new Date().getTime();
            Stream.iterate(0,x->x+1).limit(100000).reduce((x1,x2)->x1+x2).ifPresent(System.out::println);
            System.out.println("单向流耗时:" + (new Date().getTime()-startTime1));
            long startTime2 = new Date().getTime();
            Stream.iterate(0,x->x+1).limit(100000).parallel().reduce((x1,x2)->x1+x2).ifPresent(System.out::println);
            System.out.println("并行流耗时:" + (new Date().getTime()-startTime2));
    
    单向流和并行流对比

    另外,根据计算的内容去决定是否使用并行流计算,如果不适合利用并行流计算,耗时比单向流还要时间长,主要取决于以下的三点:

    1. 是否需要并行
    2. 任务是否是独立可进行
    3. 结果是否取决于任务的执行顺序

    Stay hungry,stay foolish.

    相关文章

      网友评论

          本文标题:Java8(二)之走进Stream

          本文链接:https://www.haomeiwen.com/subject/ltconqtx.html