美文网首页
Java8中的Stream相关用法

Java8中的Stream相关用法

作者: still_loving | 来源:发表于2018-10-14 19:18 被阅读0次

    概念

           在Java中有关流的概念有很多,比如输入输出流(InputStream/OutputStream),或者在对XML文件进行SAX解析的流,甚至于大数据的实时处理也有流( Amazon Kinesis ),在Java8版本之后,JDK又引入了一个全新的Stream API,它不是前面说的任何一种,而是对集合对象功能的增强,同时结合Java8以后引入的Lambda表达式,可以极大的简化编码量,而且程序的可读性也会有一个很大的提升。

           Stream不仅仅只是增强了Java中集合的操作,同时它也是非常高效的,它提供了串行和并行的两种模式,现在的计算机都是多核的时代,可以充分利用这一特点提高程序执行效率,暂时不用深究串行和并行的问题,后面会介绍到,只要知道,引入了Stream API之后,如果需要改成并行执行,主体代码完全不用动,只要切换成并行模式就行,非常方便快捷。

    什么是Stream?

           前面介绍了半天,其实如果没有用过的人,第一眼看上去,都是模糊不清的,我们可以参考现实生活中的场景,比如工厂的流水线,无论是自动化还是人工化的流水线操作,流水线的作业特点很明显,一般都是在不同的流水节点配置不同的人员或者自动化机器。比如:在生产药品的过程中,如果药片生产完成后,一般都需要一些检验以及包装的过程,这时一般都会有一条流水作业,在这个流水作业过程中,有负责检测的,有负责包装的,等到整个流水线全部走完,最后就会变成规格相等的成品包装药。

           而Stream API引入后,我们针对集合的操作就有点类似于这种流水线的作业一样,对于集合中的元素,就相当于一颗颗药片,在经历了筛选,包装之后,变成另外一种“规格”呈现出来,而流在经历了终结操作之后,会依据源数据生成一个我们需要的最终数据结构。当然这里类比流水线也并不是特别贴切,但是大致的概念的上还是很像的,便于快速了解Java8中这套新引入的Stream。

           Stream不是集合元素,更不是数据结构,它跟数据的存储没有任何关系,它只是一种针对数据的计算而存在的,可以把它看成是更高级的迭代器。回想一下,我们传统的在进行集合操作的时候,例如:过滤掉集合中的某些元素,或者对集合中的数据进行再次加工,一般我们都需要不停迭代集合内部的元素,然后进行条件判断,通过条件的留下,不符合的筛掉,这种代码写出来一般都大同小异,而且代码量都很大,阅读起来也比较费事。Stream可以解决这些问题,我们只要给出针对集合内部元素需要进行的操作,Stream 会隐式地在内部进行遍历,然后进行相应的数据转换。

           而且类似于流水线一样,Stream是单向的,不可重复的,只能遍历一次,就像流水线作业一样,只能一条路走到头,走完之后就无法再重新走一遍了。

           前面说过Stream是可以进行并行化模式的,这点也是它不同于迭代器的一点,这是一个什么概念呢?串行化操作就是对集合内部的数据逐个读取,但是使用并行化模式后,就会将数据分成多段,每段数据都会在不同的线程中去处理,然后将结果一同输出,这里的并行操作依赖于Java7中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。这就类似于流水作业线一样,会有多条流水作业线同时进行产品的流水线操作,可以加速处理的速度。

    Stream的构成

           根据前面的介绍,可以了解到,Stream中有三个很重要的步骤:源数据(source)、数据转换(transforming values)、执行操作(operations)。数据源就不需要赘述了,一般就是一个集合,数据转换其实就是对集合中的数据进行一系列的校验,筛选甚至是再加工处理;执行操作就是将符合条件的数据整合成需要的数据结构返回出来。而且Stream的很多方法都是返回它自身(this),在编码时完全可以采用链式编程,对数据的操作就会像一个链条一样排列在一起,形成一个管道。这样还有个好处:如果有需要,可以在链条之中按照需要插入各个转换操作,做成一种类似与“可插拔”的效果。

    source生成方式

    • Collection和数组

      • Collection.stream和parallelStream方法

      • Arrays.stream(T array)和Stream.of()

    • 从BufferReader中生成

      • java.io.BufferReader.lines()
    • 静态工厂

      • java.util.stream.IntStream.range()

      • java.nio.file.Files.walk()

    • 自己构建

      • java.util.Spliterator
    • 其他方式

      • Random.ints()

      • BitSet.stream()

      • Pattern.splitAsStream(java.lang.CharSequence)

      • JarFile.stream()

    Stream操作类型

           流的操作分为两个阶段,也就是它的两种类型:Intermediate和terminal,简单翻译过来就是“中间”和“最终操作”。结合前面的介绍 ,简单来说就是:Intermediate操作对应着数据转换过程,一个Stream可以进行多次的Intermediate操作,而且Stream有一个特点,就是它的惰性(lazy),具体就体现在:多次的Intermediate操作实际上并不会真正遍历数据,只有在最终的那次Terminal操作后,才会循环Stream里面的集合,然后执行所有的操作。所以它前面的Intermediate并不会真正操作数据。

    除此之外,还有一种成为short-circuiting操作,翻译过来叫短路操作:

    • 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个有限的新Stream

    • 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。

    上面的概念看上去也是一脸懵逼,但是毕竟它是专业性解释,还是要贴出来的,通俗点来说:

           我们可以联想一种场景,比如现在有无穷的人员信息数据,我们现在需要找到其中5个具有某些特征人员信息,这时候其实我们是没有必要全部遍历所有的数据,只是在遍历过程中如果发现符合条件的人,就通过,一旦达到五条数据之后,剩下的就完全可以抛弃了,这个有点类似于我们常说的逻辑运算符(&& 和 || )的短路操作,如:&&操作,如果前面为false,后面就不会执行了直接返回false, || 也是一样,如果前面一个结果为true,后面也不会执行。所以说上面介绍的两点短路操作情况,其实说得都是一种情况:数据源无限,结果有限,这样才能在有限时间内得到结果。

           另外需要明确的是:Stream操作的过程中,是不会对源数据做任何修改的,在经历过Stream处理后的结果,一般都是存储到另外的一个空间中,对源数据没有任何影响。

    • Intermediate常用的方法:map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered

    • Terminal操作常用的方法:forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

    • Short-circuiting常用操作:anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit

           上面这些方法只是作为一个简单记录,后面介绍使用的时候,会具体用到,到时就可以了解它们的功能了,这里只是记录下来,作为一个了解。

    Stream的使用

           前面扯了这么多,实际上仍然没有具体说怎么使用,有了前面的概念介绍,下面介绍使用的时候,就会清楚多了,不会像第一次看到那样毫无头绪了。流的使用其实就是实现一个 filter-map-reduce 过程,过程中使用Lambda表达式,是一种函数式编程,对于函数式编程概念不清楚的,可以了解一下。其实我们如果熟悉js就会很快上手使用了,这个跟ES6中遍历数组的操作类似。

    Stream对象的构建

    Stream的构建其实就是前面介绍的source的生成方式中介绍的那样即可:

    //使用Stream.of构建
    Stream stream = Stream.of("a", "b", "c");
    //使用数组构建
    String[] array = {"a", "b", "c"};
    stream = Arrays.stream(array);
    //使用Collection构建
    List<String> list = Arrays.asList(array);
    stream = list.stream();
    

           Stream是支持泛型的,但是对于基本数据类型和对应的包装类型,存在自动拆装箱的情况,这个过程比较耗费性能,所以这里Stream提供了IntStream、LongStream、DoubleStream这三种特殊的Stream用以基本数据类型的计算。目前只有这三种(java8版本)。

    //数值流的构造
    IntStream.of(1, 2, 3);
    //range和rangeClosed构建
    //range表示开区间 [1, 3),rangeClosed表示闭区间 [1, 3]
    IntStream.range(1, 3);// 1, 2
    IntStream.rangeClosed(1, 3);// 1, 2, 3
    

    Stream具体操作

    这里简单介绍一些关于Stream在具体代码中的使用方式,如果想要了解更多,可以考虑查阅更多的文档。

    //一个简单的场景:字符串集合中,将所有字符串全部转换成大写
    String[] words = {'a', 'b', 'c', 'd'};
    Stream stream = Arrays.toList(words).stream();
    List<String> output = stream.map(String::toUpperCase)
     .collect(Collectors.toList());
    

           这里使用了“String::toUpperCase”这种写法,这个也是Java8新引入的特性Supplier,这里就不再深入介绍它了,这里只要知道,它的功能就是找到String类中定义的toUpperCase方法,然后将stream中的每个元素作为toUpperCase方法的入参,不停调用它并返回新的结果。

           上面介绍中,可以看到collect其实就是一个Terminal操作,中间的这个map就是一个Intermediate操作,当然这个Intermediate操作还可以继续添加,对源数据继续进行转换。这里再回头看前面介绍过的Intermediate操作常用的方法,可以看到,这个过程有很多方法可以调用,比如:filter过滤用的、forEach遍历用的等等。

           Terminal操作永远都是在链条的最后,并且只能调用一次,一旦执行后,Stream上的元素就被“消费”掉了,无法对一个Stream进行两次Terminal操作。例如:

    stream.forEach(element -> doOneThing(element));
    stream.forEach(element -> doAnotherThing(element));
    

           这里的forEach就是一个Terminal操作,如果确实有需要对其中的每一个数据有其他操作,可以添加到Intermediate操作过程中,这里以peek方法为例:

    Stream.of("one","two","three","four","five")
           .filter(e -> e.length() > 3)
           .peek(e -> System.out.println("Filtered value: " + e))
           .map(String::toUpperCase)
           .peek(e -> System.out.println("Mapped value: " + e))
           .collect(Collectors.toList());
    

           上面的代码中,Intermediate操作就叠加了很多层,然后collect操作结束整个流过程,这里也很明显能感受到Stream编程带来的好处:代码的可读性大幅度提高了,而且代码比较优雅。我们不论熟不熟悉上面的具体语法,但是通过阅读上面的代码,我们很明看可以知道它到底是在干什么:对字符串集合过滤出长度大于3的元素,输出通过校验的元素,将通过的元素转成大写,再次输出结果,最后返回List<String>,这个就是可读性,如果按照传统的方式,我们需要一遍又一遍迭代遍历集合,才能达到上面的效果,代码量会明显加大,而且代码可读性非常差,不够一目了然。

           仔细看前面说过的Terminal常用操作方法和Short-circuiting常用操作,可以发现里面是有些重叠的,比如:findFirst、findAll、anyMatch等等。这类操作根据方法名称就可以了解它们的功能了,这里我就不再赘述了,之所以提一下,主要是需要注意一下:Short-circuiting与其他两类操作的界限是不明显的,这个仔细一想也能明白,它们分类的出发点都不同,所以有重合很正常。

    自定义生成流

           这里我们需要用到Stream.generate或者Stream.iterate方法,通过实现 Supplier 接口,你可以自己来控制流的生成。把 Supplier 实例传递给 Stream.generate() 生成的 Stream。它默认是串行而且无序的。现在以生成10个随机数为例:

    //传统方式:借助于Random
    Random seed = new Random();
    Supplier<Integer> random = seed::nextInt;
    Stream.generate(random).limit(10).forEach(System.out::println);
    ​
    //Stream.generate方式
    IntStream.generate(() -> (int) (System.nanoTime() % 100))
     .limit(10)
     .forEach(System.out::println);
    

           上面代码中,后者采用了System.nanoTime()产生系统随机数,因为它是无限的,如果不进行短路操作,Stream中会不断产生随机数,没有边界,所以必须要limit一下,获取前十个即可。generate方法里面的参数我们也可以自己手动实现,只要写一个类实现Supplier即可,需要什么逻辑在具体实现类里面写清楚就行。

    class MusicSupplier implements Supplier<Music> {
       private int index = 0;
    
       @Override
       public Music get() {
         return new Music(index++, "Music_" + index);
       }
    }
    class Music{
       private int id;
       private String name;
       public Music(int id, String name) {
         this.id = id;
         this.name = name;
       }
       ...省略getter和setter
    }
    class Test {
       public static void main(String[] args) {
         //生成10个Music对象并打印结果
         Stream.generate(new MusicSupplier())
               .limit(10)
               .forEach(m -> System.out.println(m.getId() + "---" + m.getName()));
       }
    }
    

           下面再来说一下Stream.iterate,它其实跟reduce操作很像,接受一个种子值和一个UnaryOperator(一元操作符,例如函数 f ),然后种子值成为 Stream 的第一个元素,f(seed) 为第二个,f(f(seed)) 第三个,以此类推。中学阶段学到的数列其实就可以用这种方式,如:等差数列、等比数列之类的。

    //等差为3的数列
    Stream.iterate(0, n -> n + 3).limit(10). forEach(x -> System.out.print(x + " "));
    

    Collectors 的 reduction 操作

           这里的Collectors是java.util.stream包下的一个辅助类,主要是辅助流的输出结果的转换,主要说的是groupingBy和partitionBy,reduction翻译过来就是减少、还原的意思,其实就是类似于数据库中的分组一样,比如现在有一个person表,我们需要根据性别分组统计,统计男女各对应有多少人。整体来说,它最终的结果是整合后的结果。下面有个示例:随机生成100个Person对象,

    private static class PersonSupplier implements Supplier<Person> {
       private int index = 0;
       private Random random = new Random();
       @Override
       public Person get() {
         return new Person(index++, "StormTestUser" + index, random.nextInt(100));
       }
    }
    ​
    private static class Person {
       public int no;
       private String name;
       private int age;
       public Person (int no, String name, int age) {
         this.no = no;
         this.name = name;
         this.age = age;
       }
       public String getName() {
         System.out.println(name);
         return name;
       }
       public int getAge() {
         return age;
       }
    }
    ​
    public class Test{
       public static void main(String[] args) {
         //这里有一个例子:根据年龄分组
         Map<Integer, List<Person>> personGroups = 
         Stream.generate(new PersonSupplier())
               .limit(100)
               .collect(Collectors.groupingBy(Person::getAge));
         Iterator it = personGroups.entrySet().iterator();
         while (it.hasNext()) {
           Map.Entry<Integer, List<Person>> persons = (Map.Entry) it.next();
           System.out.println("Age " + persons.getKey() + " = " + persons.getValue().size());
         }
    
         //按照未成年人和成年人归组
         Map<Boolean, List<Person>> children = 
         Stream.generate(new PersonSupplier())
               .limit(100)
               .collect(Collectors.partitioningBy(p -> p.getAge() < 18));
         System.out.println("Children number: " + children.get(true).size());
         System.out.println("Adult number: " + children.get(false).size());
       }
    }
    

    Stream的优势

    • 提高代码可读性:这个通过前面的介绍也应该可以有一个清晰的感受,代码的条理性和可读性都有很大的提升,可读性高,维护起来就比较方便,软件主要的生命周期都是在维护阶段,所以代码的可维护性以及维护成本非常重要。

    • 降低代码量,提高灵活度:因为传统的集合操作,必定摆脱不了遍历操作,而Stream隐式遍历就大大减少了编码人员的工作量,我们不需要关注具体的遍历情况,只需要将便利过程中需要加入的逻辑放到 Intermediate 操作中。而且因为 Intermediate 操作的可重复性,后期如果需要添加额外的处理逻辑,直接在代码链上添加或删除就行,方便灵活。

    • 无限数据量:理论上,source的数据量可以是无限的,只要有相应的短路操作或者能够快速得到结果的操作即可,这些在对于一些海量数据的情况下,Stream提供了一种更快捷优雅的解决方式。

    • 支持并行:可以充分利用现代计算机多核的优势,极大地提高了数据的处理速度。

    Stream的使用场景

    多个集合操作

           比如:先进行filter过滤,然后再forEach,这时传统做法就是先遍历一次进行顾虑,然后再一次遍历过滤后的数据集合,此时如果使用Stream操作就变得优雅简单,并且非常高效。

    对性能要求比较高

           这里就需要提升数据的处理速度,比如并行模式,这时传统的方案需要额外添加许多逻辑,甚至是并发的逻辑,但是采用Stream的方式非常简单快捷,如果遇到这种场合,可以考虑Stream的方案。

    函数式编程

           Stream的设计初衷之一就是为了在Java中引入函数式的编程风格,如果团队中确实有这种偏好或者规定,可以考虑使用Stream。

    无界限

           集合的大小是有界的,但是流不需要,有许多短路操作可以允许我们在有限的时间内完成无限流的计算,如果遇到这种情况,只能采用Stream方式,因为传统方式不能达到这种效果。

    其他问题

    paralleStream的线程安全问题

           Stream支持并行模式,但是如果使用不当,很容易陷入误区。这里举一个简单的例子:分别用串行、并行以及加锁的方式往三个list集合中添加一万个元素:

    private static List<Integer> list1 = new ArrayList<>();
    private static List<Integer> list2 = new ArrayList<>();
    private static List<Integer> list3 = new ArrayList<>();
    private static Lock lock = new ReentrantLock();
    ​
    public static void main(String[] args) {
       IntStream.range(0, 10000).forEach(list1::add);
    ​
       IntStream.range(0, 10000).parallel().forEach(list2::add);
    ​
       IntStream.range(0, 10000).forEach(i -> {
         lock.lock();
         try {
           list3.add(i);
         }finally {
           lock.unlock();
         }
       });
    ​
       System.out.println("串行执行的大小:" + list1.size());
       System.out.println("并行执行的大小:" + list2.size());
       System.out.println("加锁并行执行的大小:" + list3.size());
    }
    

           串行和加锁的方式每次得到的结果都是10000,是正确的,但是中间的并行执行每次结果都不一样,很明显,并行模式下,并不能保证线程安全。针对这种情况,它的解决方案是使用collect和reduce接口,深层的原因涉及到Stream的具体原理,这里就不再深入,只是记住一个结论:paralleStream里直接去修改变量是非线程安全的,但是采用collect和reduce操作就是满足线程安全的了。

    相关文章

      网友评论

          本文标题:Java8中的Stream相关用法

          本文链接:https://www.haomeiwen.com/subject/egdtzftx.html